This is an automated email from the ASF dual-hosted git repository.

jxue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git

commit 56e68b16d81a176e29ecc0692464db155ba9db00
Author: xyuanlu <[email protected]>
AuthorDate: Tue Sep 24 07:33:05 2024 +0800

    Add an end to end test for helix gateway (#2922)
    
    Add an end to end test for helix gateway
---
 .../HelixGatewayServicePollModeChannel.java        |  11 +-
 .../participant/HelixGatewayParticipant.java       |   5 +
 .../gateway/service/GatewayServiceManager.java     |  22 +-
 .../gateway/util/GatewayCurrentStateCache.java     |   2 +
 .../apache/helix/gateway/util/PollChannelUtil.java |   1 +
 .../util/StateTransitionMessageTranslateUtil.java  |  12 +-
 .../integration/TestFilePullChannelE2E.java        | 255 +++++++++++++++++++++
 7 files changed, 292 insertions(+), 16 deletions(-)

diff --git 
a/helix-gateway/src/main/java/org/apache/helix/gateway/channel/HelixGatewayServicePollModeChannel.java
 
b/helix-gateway/src/main/java/org/apache/helix/gateway/channel/HelixGatewayServicePollModeChannel.java
index 3acd9e83b..77caf0c17 100644
--- 
a/helix-gateway/src/main/java/org/apache/helix/gateway/channel/HelixGatewayServicePollModeChannel.java
+++ 
b/helix-gateway/src/main/java/org/apache/helix/gateway/channel/HelixGatewayServicePollModeChannel.java
@@ -19,7 +19,9 @@ package org.apache.helix.gateway.channel;
  * under the License.
  */
 
+import java.io.File;
 import java.io.IOException;
+import java.nio.file.Files;
 import org.apache.commons.lang3.NotImplementedException;
 import org.apache.helix.gateway.api.service.HelixGatewayServiceChannel;
 import proto.org.apache.helix.gateway.HelixGatewayServiceOuterClass;
@@ -110,7 +112,6 @@ public class HelixGatewayServicePollModeChannel implements 
HelixGatewayServiceCh
         boolean prevLiveness =
             _livenessResults.get(clusterName) != null && 
_livenessResults.get(clusterName).get(instanceName);
         boolean liveness = fetchInstanceLivenessStatus(clusterName, 
instanceName);
-
         if (prevLiveness && !liveness) {  // previously connected, now 
disconnected
           logger.warn("Host {} is not healthy, sending event to gateway 
manager", instanceName);
           pushClientEventToGatewayManager(_manager,
@@ -172,7 +173,6 @@ public class HelixGatewayServicePollModeChannel implements 
HelixGatewayServiceCh
   public void stop() {
     logger.info("Stopping Helix Gateway Service Poll Mode Channel...");
     // Shutdown the scheduler gracefully when done (e.g., on app termination)
-    Runtime.getRuntime().addShutdownHook(new Thread(() -> {
       _scheduler.shutdown();
       try {
         if (!_scheduler.awaitTermination(1, TimeUnit.MINUTES)) {
@@ -181,7 +181,12 @@ public class HelixGatewayServicePollModeChannel implements 
HelixGatewayServiceCh
       } catch (InterruptedException e) {
         _scheduler.shutdownNow();
       }
-    }));
+    // remove files
+    if (_shardStateChannelType == 
GatewayServiceChannelConfig.ChannelType.FILE) {
+      File file = new File(_targetStateFilePath);
+      boolean res = file.delete();
+      logger.info("Delete target state file: " + file + " res :" + res);
+    }
   }
 
   @Override
diff --git 
a/helix-gateway/src/main/java/org/apache/helix/gateway/participant/HelixGatewayParticipant.java
 
b/helix-gateway/src/main/java/org/apache/helix/gateway/participant/HelixGatewayParticipant.java
index 8dd04644b..4d3b975c0 100644
--- 
a/helix-gateway/src/main/java/org/apache/helix/gateway/participant/HelixGatewayParticipant.java
+++ 
b/helix-gateway/src/main/java/org/apache/helix/gateway/participant/HelixGatewayParticipant.java
@@ -27,6 +27,7 @@ import java.util.concurrent.ConcurrentHashMap;
 import org.apache.helix.HelixManager;
 import org.apache.helix.InstanceType;
 import org.apache.helix.gateway.api.service.HelixGatewayServiceChannel;
+import org.apache.helix.gateway.channel.HelixGatewayServicePollModeChannel;
 import org.apache.helix.gateway.service.GatewayServiceManager;
 import 
org.apache.helix.gateway.statemodel.HelixGatewayMultiTopStateStateModelFactory;
 import org.apache.helix.gateway.util.StateTransitionMessageTranslateUtil;
@@ -34,6 +35,8 @@ import org.apache.helix.manager.zk.HelixManagerStateListener;
 import org.apache.helix.manager.zk.ZKHelixManager;
 import org.apache.helix.model.Message;
 import org.apache.helix.participant.statemachine.StateTransitionError;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 
 /**
@@ -43,6 +46,7 @@ import 
org.apache.helix.participant.statemachine.StateTransitionError;
  * transitions signaled by remote participant.
  */
 public class HelixGatewayParticipant implements HelixManagerStateListener {
+  private static final Logger logger = 
LoggerFactory.getLogger(HelixGatewayParticipant.class);
   public static final String UNASSIGNED_STATE = "UNASSIGNED";
   private final HelixGatewayServiceChannel _gatewayServiceChannel;
   private final HelixManager _helixManager;
@@ -113,6 +117,7 @@ public class HelixGatewayParticipant implements 
HelixManagerStateListener {
    * Completes the state transition with the given transitionId.
    */
   public void completeStateTransition(String resourceId, String shardId, 
String currentState) {
+    logger.info("Completing state transition for shard: {}{} to state: {}", 
resourceId, shardId, currentState);
     String concatenatedShardName = resourceId + shardId;
     CompletableFuture<String> future = 
_stateTransitionResultMap.get(concatenatedShardName);
     if (future != null) {
diff --git 
a/helix-gateway/src/main/java/org/apache/helix/gateway/service/GatewayServiceManager.java
 
b/helix-gateway/src/main/java/org/apache/helix/gateway/service/GatewayServiceManager.java
index e4d207fd7..9d4430ba1 100644
--- 
a/helix-gateway/src/main/java/org/apache/helix/gateway/service/GatewayServiceManager.java
+++ 
b/helix-gateway/src/main/java/org/apache/helix/gateway/service/GatewayServiceManager.java
@@ -88,11 +88,10 @@ public class GatewayServiceManager {
    */
   public void setGatewayServiceChannel(HelixGatewayServiceChannel channel) {
     if (_gatewayServiceChannel != null) {
-      _gatewayServiceChannel.stop();
-      return;
+      throw new IllegalStateException(
+          "Gateway service channel is already set, it can only be set once.");
     }
-    throw new IllegalStateException(
-        "Gateway service channel is already set, it can only be set once.");
+    _gatewayServiceChannel = channel;
   }
 
   /**
@@ -163,6 +162,7 @@ public class GatewayServiceManager {
 
     @Override
     public void run() {
+      System.out.println("Processing state transition result " + 
_event.getInstanceName());
       HelixGatewayParticipant participant =
           getHelixGatewayParticipant(_event.getClusterName(), 
_event.getInstanceName());
       if (participant == null) {
@@ -201,7 +201,11 @@ public class GatewayServiceManager {
   public void stopManager() {
     _connectionEventProcessor.shutdown();
     _participantStateTransitionResultUpdator.shutdown();
-    _helixGatewayParticipantMap.clear();
+    _helixGatewayParticipantMap.forEach((clusterName, participantMap) -> {
+      participantMap.forEach((instanceName, participant) -> {
+        participant.disconnect();
+      });
+    });
   }
 
   public void startService() throws IOException {
@@ -231,9 +235,13 @@ public class GatewayServiceManager {
     HelixGatewayParticipant participant = 
getHelixGatewayParticipant(clusterName, instanceName);
     if (participant != null) {
       participant.disconnect();
-      _helixGatewayParticipantMap.get(clusterName).remove(instanceName);
+      if (_helixGatewayParticipantMap.containsKey(clusterName)) {
+        _helixGatewayParticipantMap.get(clusterName).remove(instanceName);
+      }
+    }
+    if (_currentStateCacheMap.containsKey(clusterName)) {
+      
_currentStateCacheMap.get(clusterName).removeInstanceTargetDataFromCache(instanceName);
     }
-    
_currentStateCacheMap.get(clusterName).removeInstanceTargetDataFromCache(instanceName);
   }
 
   private HelixGatewayParticipant getHelixGatewayParticipant(String 
clusterName,
diff --git 
a/helix-gateway/src/main/java/org/apache/helix/gateway/util/GatewayCurrentStateCache.java
 
b/helix-gateway/src/main/java/org/apache/helix/gateway/util/GatewayCurrentStateCache.java
index 2b8b1c978..bbec7f3e4 100644
--- 
a/helix-gateway/src/main/java/org/apache/helix/gateway/util/GatewayCurrentStateCache.java
+++ 
b/helix-gateway/src/main/java/org/apache/helix/gateway/util/GatewayCurrentStateCache.java
@@ -116,6 +116,7 @@ public class GatewayCurrentStateCache {
    * example : 
{"instance1":{"resource1":{"shard1":"ONLINE","shard2":"OFFLINE"}}}}
    */
   public synchronized ObjectNode serializeTargetAssignmentsToJSONNode() {
+    ObjectNode root = mapper.createObjectNode();
     for (Map.Entry<String, ShardStateMap> entry : _targetStateMap.entrySet()) {
       root.set(entry.getKey(), entry.getValue().toJSONNode());
     }
@@ -183,6 +184,7 @@ public class GatewayCurrentStateCache {
      * @return a JSON object representing the shard state map. Example: 
{"shard1":"ONLINE","shard2":"OFFLINE"}
      */
     public synchronized ObjectNode toJSONNode() {
+      ObjectNode root = mapper.createObjectNode();
       for (Map.Entry<String, Map<String, String>> entry : 
_stateMap.entrySet()) {
         String resource = entry.getKey();
         ObjectNode resourceNode = mapper.createObjectNode();
diff --git 
a/helix-gateway/src/main/java/org/apache/helix/gateway/util/PollChannelUtil.java
 
b/helix-gateway/src/main/java/org/apache/helix/gateway/util/PollChannelUtil.java
index 26de6db0f..2e8d277a8 100644
--- 
a/helix-gateway/src/main/java/org/apache/helix/gateway/util/PollChannelUtil.java
+++ 
b/helix-gateway/src/main/java/org/apache/helix/gateway/util/PollChannelUtil.java
@@ -69,6 +69,7 @@ public class PollChannelUtil {
   public static void flushAssignmentToFile(String targetAssignment, String 
filePath) {
     try (FileWriter fileWriter = new FileWriter(filePath)) {
       fileWriter.write(targetAssignment);
+      fileWriter.close();
     } catch (IOException e) {
       logger.warn("Failed to write to file: " + filePath, e);
     }
diff --git 
a/helix-gateway/src/main/java/org/apache/helix/gateway/util/StateTransitionMessageTranslateUtil.java
 
b/helix-gateway/src/main/java/org/apache/helix/gateway/util/StateTransitionMessageTranslateUtil.java
index a2d07085b..5fc319737 100644
--- 
a/helix-gateway/src/main/java/org/apache/helix/gateway/util/StateTransitionMessageTranslateUtil.java
+++ 
b/helix-gateway/src/main/java/org/apache/helix/gateway/util/StateTransitionMessageTranslateUtil.java
@@ -130,7 +130,7 @@ public final class StateTransitionMessageTranslateUtil {
    * @param clusterName the cluster name
    * @return GatewayServiceEvent
    */
-  public static GatewayServiceEvent translateClientCloseToEvent(String 
instanceName, String clusterName) {
+  public static GatewayServiceEvent translateClientCloseToEvent(String 
clusterName, String instanceName) {
     GatewayServiceEvent.GateWayServiceEventBuilder builder =
         new 
GatewayServiceEvent.GateWayServiceEventBuilder(GatewayServiceEventType.DISCONNECT).setClusterName(
             clusterName).setParticipantName(instanceName);
@@ -144,7 +144,7 @@ public final class StateTransitionMessageTranslateUtil {
    * @param shardStateMap
    * @return
    */
-  public static GatewayServiceEvent translateCurrentStateChangeToEvent(String 
instanceName, String clusterName,
+  public static GatewayServiceEvent translateCurrentStateChangeToEvent(String 
clusterName, String instanceName,
       Map<String, Map<String, String>> shardStateMap) {
     List<GatewayServiceEvent.StateTransitionResult> stResult = new 
ArrayList<>();
     shardStateMap.forEach((resourceName, value) -> value.forEach((key, value1) 
-> {
@@ -165,12 +165,12 @@ public final class StateTransitionMessageTranslateUtil {
    * @param shardStateMap the initial state of shards on the participant. 
Could be empty map
    * @return
    */
-  public static GatewayServiceEvent 
translateCurrentStateDiffToInitConnectEvent(String instanceName, String 
clusterName,
+  public static GatewayServiceEvent 
translateCurrentStateDiffToInitConnectEvent(String clusterName, String 
instanceName,
       Map<String, Map<String, String>> shardStateMap) {
     GatewayServiceEvent.GateWayServiceEventBuilder builder =
-        new 
GatewayServiceEvent.GateWayServiceEventBuilder(GatewayServiceEventType.CONNECT).setClusterName(
-            
clusterName).setParticipantName(instanceName).setShardStateMap(shardStateMap);
+        new 
GatewayServiceEvent.GateWayServiceEventBuilder(GatewayServiceEventType.CONNECT).setClusterName(clusterName)
+            .setParticipantName(instanceName)
+            .setShardStateMap(shardStateMap);
     return builder.build();
   }
-
 }
diff --git 
a/helix-gateway/src/test/java/org/apache/helix/gateway/integration/TestFilePullChannelE2E.java
 
b/helix-gateway/src/test/java/org/apache/helix/gateway/integration/TestFilePullChannelE2E.java
new file mode 100644
index 000000000..3066dcc91
--- /dev/null
+++ 
b/helix-gateway/src/test/java/org/apache/helix/gateway/integration/TestFilePullChannelE2E.java
@@ -0,0 +1,255 @@
+package org.apache.helix.gateway.integration;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import org.apache.helix.ConfigAccessor;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.TestHelper;
+import org.apache.helix.gateway.base.HelixGatewayTestBase;
+import org.apache.helix.gateway.channel.GatewayServiceChannelConfig;
+import org.apache.helix.gateway.service.GatewayServiceManager;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.manager.zk.ZKHelixDataAccessor;
+import org.apache.helix.manager.zk.ZKHelixManager;
+import org.apache.helix.manager.zk.ZkBaseDataAccessor;
+import org.apache.helix.model.ClusterConfig;
+import 
org.apache.helix.tools.ClusterVerifiers.BestPossibleExternalViewVerifier;
+import org.testng.Assert;
+import org.testng.annotations.AfterTest;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import static 
org.apache.helix.gateway.channel.GatewayServiceChannelConfig.ChannelMode.*;
+
+
+public class TestFilePullChannelE2E extends HelixGatewayTestBase {
+
+  private static final String CLUSTER_NAME = "CLUSTER_" + 
TestFilePullChannelE2E.class.getSimpleName();
+  private static final int START_NUM_NODE = 3;
+  private static final String TEST_DB = "TestDB";
+  private static final String TEST_STATE_MODEL = "OnlineOffline";
+  private static final String CONTROLLER_PREFIX = "controller";
+  private static final String currentStatePath = "tmpcurrentState";
+  private static final String targetStatePath = "tmptargetState";
+  GatewayServiceManager manager1, manager2, manager0;
+  ArrayList<Path> csPaths = new ArrayList<Path>();
+  ArrayList<Path> targetPaths = new ArrayList<Path>();
+  ArrayList<Path> healthPaths = new ArrayList<Path>();
+  private ClusterControllerManager _controller;
+
+  @BeforeClass
+  public void beforeClass() {
+    super.beforeClass();
+
+    // Set up the Helix cluster
+    ConfigAccessor configAccessor = new ConfigAccessor(_gZkClient);
+    _gSetupTool.addCluster(CLUSTER_NAME, true);
+
+    ClusterConfig clusterConfig = 
configAccessor.getClusterConfig(CLUSTER_NAME);
+    
clusterConfig.getRecord().setSimpleField(ZKHelixManager.ALLOW_PARTICIPANT_AUTO_JOIN,
 "true");
+    configAccessor.setClusterConfig(CLUSTER_NAME, clusterConfig);
+
+    // Start the controller
+    String controllerName = CONTROLLER_PREFIX + '_' + CLUSTER_NAME;
+    _controller = new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, 
controllerName);
+    _controller.syncStart();
+
+    // Enable best possible assignment persistence
+    enablePersistBestPossibleAssignment(_gZkClient, CLUSTER_NAME, true);
+  }
+
+  @Test
+  public void testE2E() throws Exception {
+    // create files for health state
+    try {
+      for (int i = 0; i < START_NUM_NODE; i++) {
+        csPaths.add(createTempFile(currentStatePath + i, ".txt", ""));
+        targetPaths.add(createTempFile(targetStatePath + i, ".txt", ""));
+        String currentTime = String.valueOf(System.currentTimeMillis());
+        String content = "{\"IsAlive\":" + true + ",\"LastUpdateTime\":" + 
currentTime + "}";
+        healthPaths.add(createTempFile("tmphealthCheck" + i, ".txt", content));
+      }
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+
+    String fileName0 = healthPaths.get(0).toAbsolutePath().toString();
+    String fileName1 = healthPaths.get(1).toAbsolutePath().toString();
+    String fileName2 = healthPaths.get(2).toAbsolutePath().toString();
+
+    GatewayServiceChannelConfig.GatewayServiceProcessorConfigBuilder builder =
+        new 
GatewayServiceChannelConfig.GatewayServiceProcessorConfigBuilder().setChannelMode(POLL_MODE)
+            
.setParticipantConnectionChannelType(GatewayServiceChannelConfig.ChannelType.FILE)
+            
.setShardStateProcessorType(GatewayServiceChannelConfig.ChannelType.FILE)
+            .setPollIntervalSec(1) // set a larger number to avoid recurrent 
polling
+            .setPollStartDelaySec(1)
+            .setTargetFileUpdateIntervalSec(1);
+
+    // create empty file for shard state
+
+    // create 3 manager instances
+    manager0 = new GatewayServiceManager(ZK_ADDR,
+        
builder.addPollModeConfig(GatewayServiceChannelConfig.FileBasedConfigType.PARTICIPANT_CURRENT_STATE_PATH,
+                csPaths.get(0).toAbsolutePath().toString())
+            
.addPollModeConfig(GatewayServiceChannelConfig.FileBasedConfigType.SHARD_TARGET_STATE_PATH,
+                targetPaths.get(0).toAbsolutePath().toString())
+            .setHealthCheckEndpointMap(Map.of(CLUSTER_NAME, 
Map.of("instance0", fileName0)))
+            .build());
+    manager1 = new GatewayServiceManager(ZK_ADDR,
+        
builder.addPollModeConfig(GatewayServiceChannelConfig.FileBasedConfigType.PARTICIPANT_CURRENT_STATE_PATH,
+                csPaths.get(1).toAbsolutePath().toString())
+            
.addPollModeConfig(GatewayServiceChannelConfig.FileBasedConfigType.SHARD_TARGET_STATE_PATH,
+                targetPaths.get(1).toAbsolutePath().toString())
+            .setHealthCheckEndpointMap(Map.of(CLUSTER_NAME, 
Map.of("instance1", fileName1)))
+            .build());
+    manager2 = new GatewayServiceManager(ZK_ADDR,
+        
builder.addPollModeConfig(GatewayServiceChannelConfig.FileBasedConfigType.PARTICIPANT_CURRENT_STATE_PATH,
+                csPaths.get(2).toAbsolutePath().toString())
+            
.addPollModeConfig(GatewayServiceChannelConfig.FileBasedConfigType.SHARD_TARGET_STATE_PATH,
+                targetPaths.get(2).toAbsolutePath().toString())
+            .setHealthCheckEndpointMap(Map.of(CLUSTER_NAME, 
Map.of("instance2", fileName2)))
+            .build());
+
+    System.out.println("Starting all managers");
+    manager0.startService();
+    manager1.startService();
+    manager2.startService();
+
+    // verify we see live instances
+    verifyInstances(CLUSTER_NAME, List.of("instance0", "instance1", 
"instance2"));
+
+    // create an DB on cluster
+    createDB();
+
+    // read the target state file and verify the target state is updated
+    verifyTargetState();
+
+    // write current state to file
+    for (int i = 0; i < 3; i++) {
+      String content =
+          "{\"" + CLUSTER_NAME + "\" : { \"instance" + i + "\" : { \"TestDB\" 
: {\"TestDB_0\" : \"ONLINE\" }}}} ";
+      Files.write(csPaths.get(i), content.getBytes());
+    }
+
+    // check no pending messages for partitions
+    verifyNoPendingMessages(List.of("instance0", "instance1", "instance2"));
+
+    // change health state to false on one instance
+    String currentTime = String.valueOf(System.currentTimeMillis());
+    String content = "{\"IsAlive\":" + false + ",\"LastUpdateTime\":" + 
currentTime + "}";
+    Files.write(healthPaths.get(0), content.getBytes());
+
+    // check live instance for that instance is gone
+    Assert.assertTrue(TestHelper.verify(() -> {
+      List<String> liveInstance = getLiveInstances();
+      return !liveInstance.contains("instance0") && 
liveInstance.contains("instance1") && liveInstance.contains(
+          "instance2");
+    }, TestHelper.WAIT_DURATION));
+
+    // stop all manager
+    manager0.stopService();
+    manager1.stopService();
+    manager2.stopService();
+
+
+    // check target state files are gone
+    for (int i = 0; i < 3; i++) {
+      Assert.assertFalse(Files.exists(targetPaths.get(i)));
+    }
+
+    // check all live instances are gone
+    Assert.assertTrue(TestHelper.verify(() -> {
+      List<String> liveInstance = getLiveInstances();
+      return !liveInstance.contains("instance0") && 
!liveInstance.contains("instance1") && !liveInstance.contains(
+          "instance2");
+    }, TestHelper.WAIT_DURATION));
+
+    for (int i = 0; i < 3; i++) {
+      try {
+        Files.deleteIfExists(csPaths.get(i));
+        Files.deleteIfExists(targetPaths.get(i));
+        Files.deleteIfExists(healthPaths.get(i));
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+    }
+  }
+
+
+  private void verifyInstances(String clusterName, List<String> instance0) 
throws Exception {
+    for (String instance : instance0) {
+      Assert.assertTrue(TestHelper.verify(
+          () -> 
_gSetupTool.getClusterManagementTool().getInstancesInCluster(clusterName).contains(instance),
+          TestHelper.WAIT_DURATION));
+    }
+  }
+
+  private List<String> getLiveInstances() {
+    ZKHelixDataAccessor dataAccessor = new ZKHelixDataAccessor(CLUSTER_NAME, 
new ZkBaseDataAccessor<>(_gZkClient));
+    PropertyKey liveInstances = dataAccessor.keyBuilder().liveInstances();
+    return dataAccessor.getChildNames(liveInstances);
+  }
+
+  private void verifyNoPendingMessages(List<String> participants) throws 
Exception {
+    ZKHelixDataAccessor dataAccessor = new ZKHelixDataAccessor(CLUSTER_NAME, 
new ZkBaseDataAccessor<>(_gZkClient));
+    for (String participant : participants) {
+      PropertyKey messagesNode = 
dataAccessor.keyBuilder().messages(participant);
+      Assert.assertTrue(
+          TestHelper.verify(() -> 
dataAccessor.getChildNames(messagesNode).isEmpty(), TestHelper.WAIT_DURATION));
+    }
+  }
+
+  private void verifyTargetState() throws Exception {
+    for (int i = 0; i < 3; i++) {
+      int finalI = i;
+      Assert.assertTrue(TestHelper.verify(() -> {
+        String content = Files.readString(targetPaths.get(finalI));
+        return content.contains("{\"TestDB\":{\"TestDB_0\":\"ONLINE\"}}}");
+      }, TestHelper.WAIT_DURATION));
+    }
+  }
+
+  public static Path createTempFile(String prefix, String suffix, String 
content) throws IOException {
+    // Create a temporary file
+    Path tempFile = Files.createTempFile(prefix, suffix);
+
+    // Write content to the temporary file
+    Files.write(tempFile, content.getBytes());
+
+    return tempFile;
+  }
+
+  private void createDB() {
+    createDBInSemiAuto(_gSetupTool, CLUSTER_NAME, TEST_DB, 
List.of("instance0", "instance1", "instance2"),
+        TEST_STATE_MODEL, 1, 3);
+
+    _clusterVerifier = new 
BestPossibleExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR)
+        .setResources(new 
HashSet<>(_gSetupTool.getClusterManagementTool().getResourcesInCluster(CLUSTER_NAME)))
+        .setWaitTillVerify(TestHelper.DEFAULT_REBALANCE_PROCESSING_WAIT_TIME)
+        .build();
+  }
+}

Reply via email to