This is an automated email from the ASF dual-hosted git repository.
xyuanlu pushed a commit to branch helix-gateway-service
in repository https://gitbox.apache.org/repos/asf/helix.git
The following commit(s) were added to refs/heads/helix-gateway-service by this
push:
new 71b4a9a31 Refactor and remove mock classes (#2841)
71b4a9a31 is described below
commit 71b4a9a3187b3dade1c6f2710fa9b418811a8479
Author: xyuanlu <[email protected]>
AuthorDate: Mon Jul 22 13:43:00 2024 -0700
Refactor and remove mock classes (#2841)
Refactor and remove mock classes
---
.../org/apache/helix/gateway/HelixGatewayMain.java | 74 --------------
.../grpcservice/HelixGatewayServiceService.java | 2 +-
.../helix/gateway/mock/ControllerManager.java | 111 ---------------------
.../apache/helix/gateway/mock/MockApplication.java | 100 -------------------
.../helix/gateway/mock/MockProtoRequest.java | 56 -----------
.../helix/gateway/mock/MockProtoResponse.java | 15 ---
.../helix/gateway/service/ClusterManager.java | 31 +-----
.../HelixGatewayOnlineOfflineStateModel.java | 81 ---------------
.../helix/gateway/service/HelixGatewayService.java | 20 ++--
.../HelixGatewayOnlineOfflineStateModel.java | 64 ++++++++++++
...HelixGatewayOnlineOfflineStateModelFactory.java | 3 +-
11 files changed, 81 insertions(+), 476 deletions(-)
diff --git
a/helix-gateway/src/main/java/org/apache/helix/gateway/HelixGatewayMain.java
b/helix-gateway/src/main/java/org/apache/helix/gateway/HelixGatewayMain.java
deleted file mode 100644
index 0577aba02..000000000
--- a/helix-gateway/src/main/java/org/apache/helix/gateway/HelixGatewayMain.java
+++ /dev/null
@@ -1,74 +0,0 @@
-package org.apache.helix.gateway;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.helix.ConfigAccessor;
-import org.apache.helix.HelixAdmin;
-import org.apache.helix.InstanceType;
-import org.apache.helix.gateway.mock.ControllerManager;
-import org.apache.helix.gateway.mock.MockApplication;
-import org.apache.helix.gateway.service.HelixGatewayService;
-import org.apache.helix.manager.zk.ZKHelixAdmin;
-import org.apache.helix.model.ClusterConfig;
-import org.apache.helix.model.IdealState;
-import org.apache.helix.model.OnlineOfflineSMD;
-import org.apache.helix.zookeeper.api.client.RealmAwareZkClient;
-import org.apache.helix.zookeeper.datamodel.serializer.ZNRecordSerializer;
-import org.apache.helix.zookeeper.impl.client.ZkClient;
-
-public final class HelixGatewayMain {
-
- private static final String ZK_ADDRESS = "localhost:2181";
- private static final String CLUSTER_NAME = "TEST_CLUSTER";
-
- private HelixGatewayMain() {
- }
-
- public static void main(String[] args) throws InterruptedException {
- RealmAwareZkClient zkClient = new ZkClient(ZK_ADDRESS);
- zkClient.setZkSerializer(new ZNRecordSerializer());
- ConfigAccessor configAccessor = new ConfigAccessor(zkClient);
- HelixAdmin admin = new ZKHelixAdmin(zkClient);
- if (admin.getClusters().isEmpty()) {
- admin.addCluster(CLUSTER_NAME);
- admin.addStateModelDef(CLUSTER_NAME, "OnlineOffline",
OnlineOfflineSMD.build());
- }
-
- ClusterConfig clusterConfig =
configAccessor.getClusterConfig(CLUSTER_NAME);
- clusterConfig.getRecord().setSimpleField("allowParticipantAutoJoin",
"true");
- configAccessor.updateClusterConfig(CLUSTER_NAME, clusterConfig);
-
- String resourceName = "Test_Resource";
-
- if (admin.getResourceIdealState(CLUSTER_NAME, resourceName) == null) {
- admin.addResource(CLUSTER_NAME, resourceName, 2, "OnlineOffline",
- IdealState.RebalanceMode.FULL_AUTO.name(),
-
"org.apache.helix.controller.rebalancer.strategy.CrushEdRebalanceStrategy");
- admin.rebalance(CLUSTER_NAME, resourceName, 3);
- }
-
- ControllerManager controllerManager =
- new ControllerManager(ZK_ADDRESS, CLUSTER_NAME, "CONTROLLER",
InstanceType.CONTROLLER);
- controllerManager.syncStart();
-
- HelixGatewayService service = new HelixGatewayService(ZK_ADDRESS);
- service.start();
-
- List<MockApplication> mockApplications = new ArrayList<>();
- for (int i = 0; i < 6; i++) {
- MockApplication mockApplication =
- new MockApplication("INSTANCE_" + i, CLUSTER_NAME,
service.getClusterManager());
- service.registerParticipant(mockApplication);
- mockApplications.add(mockApplication);
- }
-
- Thread.sleep(100000000);
-
- MockApplication mockApplication = mockApplications.get(3);
- service.deregisterParticipant(mockApplication.getClusterName(),
- mockApplication.getInstanceName());
-
- controllerManager.syncStop();
- }
-}
diff --git
a/helix-gateway/src/main/java/org/apache/helix/gateway/grpcservice/HelixGatewayServiceService.java
b/helix-gateway/src/main/java/org/apache/helix/gateway/grpcservice/HelixGatewayServiceService.java
index 441fba952..286cf2001 100644
---
a/helix-gateway/src/main/java/org/apache/helix/gateway/grpcservice/HelixGatewayServiceService.java
+++
b/helix-gateway/src/main/java/org/apache/helix/gateway/grpcservice/HelixGatewayServiceService.java
@@ -1,8 +1,8 @@
package org.apache.helix.gateway.grpcservice;
+import io.grpc.stub.StreamObserver;
import proto.org.apache.helix.gateway.*;
import proto.org.apache.helix.gateway.HelixGatewayServiceOuterClass.*;
-import io.grpc.stub.StreamObserver;
public class HelixGatewayServiceService extends
HelixGatewayServiceGrpc.HelixGatewayServiceImplBase {
diff --git
a/helix-gateway/src/main/java/org/apache/helix/gateway/mock/ControllerManager.java
b/helix-gateway/src/main/java/org/apache/helix/gateway/mock/ControllerManager.java
deleted file mode 100644
index 3d33874c4..000000000
---
a/helix-gateway/src/main/java/org/apache/helix/gateway/mock/ControllerManager.java
+++ /dev/null
@@ -1,111 +0,0 @@
-package org.apache.helix.gateway.mock;
-
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.atomic.AtomicLong;
-
-import org.apache.helix.HelixManagerProperty;
-import org.apache.helix.InstanceType;
-import org.apache.helix.manager.zk.HelixManagerStateListener;
-import org.apache.helix.manager.zk.ZKHelixManager;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class ControllerManager extends ZKHelixManager implements Runnable {
- private static final int DISCONNECT_WAIT_TIME_MS = 3000;
- private static Logger logger =
LoggerFactory.getLogger(ControllerManager.class);
- private static AtomicLong uid = new AtomicLong(10000);
- private final String _clusterName;
- private final String _instanceName;
- private final InstanceType _type;
- protected CountDownLatch _startCountDown = new CountDownLatch(1);
- protected CountDownLatch _stopCountDown = new CountDownLatch(1);
- protected CountDownLatch _waitStopFinishCountDown = new CountDownLatch(1);
- protected boolean _started = false;
- protected Thread _watcher;
- private long _uid;
-
- public ControllerManager(String zkAddr, String clusterName, String
instanceName,
- InstanceType type) {
- super(clusterName, instanceName, type, zkAddr);
- _clusterName = clusterName;
- _instanceName = instanceName;
- _type = type;
- _uid = uid.getAndIncrement();
- }
-
- protected ControllerManager(String clusterName, String instanceName,
InstanceType instanceType,
- String zkAddress, HelixManagerStateListener stateListener,
- HelixManagerProperty helixManagerProperty) {
- super(clusterName, instanceName, instanceType, zkAddress, stateListener,
helixManagerProperty);
- _clusterName = clusterName;
- _instanceName = instanceName;
- _type = instanceType;
- _uid = uid.getAndIncrement();
- }
-
- public void syncStop() {
- _stopCountDown.countDown();
- try {
- _waitStopFinishCountDown.await();
- _started = false;
- } catch (InterruptedException e) {
- logger.error("Interrupted waiting for finish", e);
- }
- }
-
- // This should not be called more than once because HelixManager.connect()
should not be called more than once.
- public void syncStart() {
- if (_started) {
- throw new RuntimeException(
- "Helix Controller already started. Do not call syncStart() more than
once.");
- } else {
- _started = true;
- }
-
- _watcher = new Thread(this);
- _watcher.setName(
- String.format("ClusterManager_Watcher_%s_%s_%s_%d", _clusterName,
_instanceName,
- _type.name(), _uid));
- logger.debug("ClusterManager_watcher_{}_{}_{}_{} started, stacktrace {}",
_clusterName,
- _instanceName, _type.name(), _uid,
Thread.currentThread().getStackTrace());
- _watcher.start();
-
- try {
- _startCountDown.await();
- } catch (InterruptedException e) {
- logger.error("Interrupted waiting for start", e);
- }
- }
-
- @Override
- public void run() {
- try {
- connect();
- _startCountDown.countDown();
- _stopCountDown.await();
- } catch (Exception e) {
- logger.error("exception running controller-manager", e);
- } finally {
- _startCountDown.countDown();
- disconnect();
- _waitStopFinishCountDown.countDown();
- }
- }
-
- /**
- @SuppressWarnings("finalizer")
- @Override public void finalize() {
- _watcher.interrupt();
- try {
- _watcher.join(DISCONNECT_WAIT_TIME_MS);
- } catch (InterruptedException e) {
- logger.error("ClusterManager watcher cleanup in the finalize method was
interrupted.", e);
- } finally {
- if (isConnected()) {
- logger.warn(
- "The HelixManager ({}-{}-{}) is still connected after {} ms wait. This is a
potential resource leakage!",
- _clusterName, _instanceName, _type.name(), DISCONNECT_WAIT_TIME_MS);
- }
- }
- }*/
-}
diff --git
a/helix-gateway/src/main/java/org/apache/helix/gateway/mock/MockApplication.java
b/helix-gateway/src/main/java/org/apache/helix/gateway/mock/MockApplication.java
deleted file mode 100644
index 679f95f05..000000000
---
a/helix-gateway/src/main/java/org/apache/helix/gateway/mock/MockApplication.java
+++ /dev/null
@@ -1,100 +0,0 @@
-package org.apache.helix.gateway.mock;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Queue;
-import java.util.concurrent.Executors;
-
-import org.apache.helix.gateway.service.ClusterManager;
-
-public class MockApplication {
- private final ClusterManager _clusterManager;
- private Map<String, Map<String, String>> _currentStates;
- private String _instanceName;
- private String _clusterName;
- private Queue<MockProtoRequest> _requestQueue;
-
- public MockApplication(String instanceName, String clusterName,
ClusterManager clusterManager) {
- _instanceName = instanceName;
- _clusterName = clusterName;
- _currentStates = new HashMap<>();
- _requestQueue = new LinkedList<>();
- _clusterManager = clusterManager;
- Executors.newScheduledThreadPool(1)
- .scheduleAtFixedRate(this::process, 0, 5000,
java.util.concurrent.TimeUnit.MILLISECONDS);
- }
-
- public void process() {
- List<MockProtoResponse> completedMessages = new ArrayList<>();
- synchronized (_requestQueue) {
- while (!_requestQueue.isEmpty()) {
- MockProtoRequest request = _requestQueue.poll();
- switch (request.getMessageType()) {
- case ADD:
- addShard(request.getResourceName(), request.getShardName());
- completedMessages.add(new
MockProtoResponse(request.getMessageId()));
- break;
- case REMOVE:
- removeShard(request.getResourceName(), request.getShardName());
- completedMessages.add(new
MockProtoResponse(request.getMessageId()));
- break;
- case CHANGE_ROLE:
- changeRole(request.getResourceName(), request.getShardName(),
request.getFromState(),
- request.getToState());
- completedMessages.add(new
MockProtoResponse(request.getMessageId()));
- break;
- default:
- System.out.println("Unknown message type: " +
request.getMessageType());
- throw new RuntimeException("Unknown message type: " +
request.getMessageType());
- }
- }
- }
- _clusterManager.receiveResponse(completedMessages, _instanceName);
- }
-
- public void addRequest(MockProtoRequest request) {
- synchronized (_requestQueue) {
- _requestQueue.add(request);
- }
- }
-
- public String getInstanceName() {
- return _instanceName;
- }
-
- public String getClusterName() {
- return _clusterName;
- }
-
- public void join() {
- System.out.println(
- "Joining Mock Application for instance " + _instanceName + " in
cluster " + _clusterName);
- }
-
- public synchronized void addShard(String resourceName, String shardName) {
- System.out.println("ADD | " + shardName + " | " + resourceName + " | " +
_instanceName);
- }
-
- public synchronized void removeShard(String resourceName, String shardName) {
- System.out.println("REMOVE | " + shardName + " | " + resourceName + " | "
+ _instanceName);
- }
-
- public synchronized void changeRole(String resourceName, String shardName,
String fromState,
- String toState) {
- System.out.println(
- "CHANGE ROLE | " + shardName + " | " + resourceName + " | " +
_instanceName + " | "
- + fromState + " -> " + toState);
- _currentStates.computeIfAbsent(resourceName, k -> new
HashMap<>()).put(shardName, toState);
- }
-
- private void sleep(long ms) {
- try {
- Thread.sleep(ms);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
-}
diff --git
a/helix-gateway/src/main/java/org/apache/helix/gateway/mock/MockProtoRequest.java
b/helix-gateway/src/main/java/org/apache/helix/gateway/mock/MockProtoRequest.java
deleted file mode 100644
index 4e462e254..000000000
---
a/helix-gateway/src/main/java/org/apache/helix/gateway/mock/MockProtoRequest.java
+++ /dev/null
@@ -1,56 +0,0 @@
-package org.apache.helix.gateway.mock;
-
-import org.apache.helix.gateway.constant.MessageType;
-
-public class MockProtoRequest {
-
- private String _messageId;
- private String _instanceName;
-
- private MessageType _messageType;
- private String _resourceName;
- private String _shardName;
-
- private String _fromState;
- private String _toState;
-
- public MockProtoRequest(MessageType messageType, String resourceName, String
shardName,
- String instanceName, String messageId, String fromState, String toState)
{
- System.out.println(
- messageType + " | " + shardName + " | " + resourceName + " | " +
instanceName + " | "
- + messageId + " | " + fromState + " | " + toState);
- _messageId = messageId;
- _instanceName = instanceName;
- _messageType = messageType;
- _resourceName = resourceName;
- _shardName = shardName;
- }
-
- public MessageType getMessageType() {
- return _messageType;
- }
-
- public String getResourceName() {
- return _resourceName;
- }
-
- public String getShardName() {
- return _shardName;
- }
-
- public String getFromState() {
- return _fromState;
- }
-
- public String getToState() {
- return _toState;
- }
-
- public String getMessageId() {
- return _messageId;
- }
-
- public String getInstanceName() {
- return _instanceName;
- }
-}
diff --git
a/helix-gateway/src/main/java/org/apache/helix/gateway/mock/MockProtoResponse.java
b/helix-gateway/src/main/java/org/apache/helix/gateway/mock/MockProtoResponse.java
deleted file mode 100644
index 108f49807..000000000
---
a/helix-gateway/src/main/java/org/apache/helix/gateway/mock/MockProtoResponse.java
+++ /dev/null
@@ -1,15 +0,0 @@
-package org.apache.helix.gateway.mock;
-
-public class MockProtoResponse {
-
- private String _messageId;
-
- public MockProtoResponse(String messageId) {
- System.out.println("Finished process of message : " + messageId);
- _messageId = messageId;
- }
-
- public String getMessageId() {
- return _messageId;
- }
-}
diff --git
a/helix-gateway/src/main/java/org/apache/helix/gateway/service/ClusterManager.java
b/helix-gateway/src/main/java/org/apache/helix/gateway/service/ClusterManager.java
index 80bd15aaa..b96694f8b 100644
---
a/helix-gateway/src/main/java/org/apache/helix/gateway/service/ClusterManager.java
+++
b/helix-gateway/src/main/java/org/apache/helix/gateway/service/ClusterManager.java
@@ -1,19 +1,15 @@
package org.apache.helix.gateway.service;
-import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
-import org.apache.helix.gateway.mock.MockApplication;
-import org.apache.helix.gateway.mock.MockProtoRequest;
-import org.apache.helix.gateway.mock.MockProtoResponse;
+
public class ClusterManager {
private Map<String, Map<String, AtomicBoolean>> _flagMap;
- private Map<String, MockApplication> _channelMap;
private Lock _lock = new ReentrantLock();
// event queue
@@ -21,34 +17,17 @@ public class ClusterManager {
public ClusterManager() {
_flagMap = new ConcurrentHashMap<>();
- _channelMap = new ConcurrentHashMap<>();
}
- public void addChannel(MockApplication mockApplication) {
- _channelMap.put(mockApplication.getInstanceName(), mockApplication);
- _flagMap.computeIfAbsent(mockApplication.getInstanceName(), k -> new
ConcurrentHashMap<>());
+ public void addChannel() {
}
public void removeChannel(String instanceName) {
- _channelMap.remove(instanceName);
_flagMap.remove(instanceName);
}
- public AtomicBoolean sendMessage(MockProtoRequest request) {
- MockApplication mockApplication =
_channelMap.get(request.getInstanceName());
- synchronized (mockApplication) {
- mockApplication.addRequest(request);
- AtomicBoolean flag = new AtomicBoolean(false);
- _flagMap.computeIfAbsent(request.getInstanceName(), k -> new
ConcurrentHashMap<>())
- .put(request.getMessageId(), flag);
- return flag;
- }
- }
-
- public synchronized void receiveResponse(List<MockProtoResponse> responses,
String instanceName) {
- for (MockProtoResponse response : responses) {
- AtomicBoolean flag =
_flagMap.get(instanceName).remove(response.getMessageId());
- flag.set(true);
- }
+ public AtomicBoolean sendMessage() {
+ AtomicBoolean flag = new AtomicBoolean(false);
+ return flag;
}
}
diff --git
a/helix-gateway/src/main/java/org/apache/helix/gateway/service/HelixGatewayOnlineOfflineStateModel.java
b/helix-gateway/src/main/java/org/apache/helix/gateway/service/HelixGatewayOnlineOfflineStateModel.java
deleted file mode 100644
index 37453e7d9..000000000
---
a/helix-gateway/src/main/java/org/apache/helix/gateway/service/HelixGatewayOnlineOfflineStateModel.java
+++ /dev/null
@@ -1,81 +0,0 @@
-package org.apache.helix.gateway.service;
-
-import java.util.UUID;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import org.apache.helix.NotificationContext;
-import org.apache.helix.gateway.constant.MessageType;
-import org.apache.helix.gateway.mock.MockProtoRequest;
-import org.apache.helix.model.Message;
-import org.apache.helix.participant.statemachine.StateModel;
-
-public class HelixGatewayOnlineOfflineStateModel extends StateModel {
- private boolean _firstTime = true;
- private ClusterManager _clusterManager;
-
- private String _resourceName;
- private String _partitionKey;
-
- private AtomicBoolean _completed;
-
- public HelixGatewayOnlineOfflineStateModel(String resourceName, String
partitionKey,
- ClusterManager clusterManager) {
- _resourceName = resourceName;
- _partitionKey = partitionKey;
- _clusterManager = clusterManager;
- }
-
- public void onBecomeOnlineFromOffline(Message message, NotificationContext
context) {
- if (_firstTime) {
- wait(_clusterManager.sendMessage(
- new MockProtoRequest(MessageType.ADD, message.getResourceName(),
- message.getPartitionName(), message.getTgtName(),
UUID.randomUUID().toString(),
- message.getToState(), message.getFromState())));
- System.out.println(
- "Message for " + message.getPartitionName() + " instance " +
message.getTgtName()
- + " with ADD for " + message.getResourceName() + " processed");
- _firstTime = false;
- }
- wait(_clusterManager.sendMessage(
- new MockProtoRequest(MessageType.CHANGE_ROLE,
message.getResourceName(),
- message.getPartitionName(), message.getTgtName(),
UUID.randomUUID().toString(),
- message.getToState(), message.getFromState())));
- System.out.println(
- "Message for " + message.getPartitionName() + " instance " +
message.getTgtName()
- + " with CHANGE_ROLE_OFFLINE_ONLINE for " +
message.getResourceName() + " processed");
- }
-
- public void onBecomeOfflineFromOnline(Message message, NotificationContext
context) {
- wait(_clusterManager.sendMessage(
- new MockProtoRequest(MessageType.CHANGE_ROLE,
message.getResourceName(),
- message.getPartitionName(), message.getTgtName(),
UUID.randomUUID().toString(),
- message.getToState(), message.getFromState())));
- System.out.println(
- "Message for " + message.getPartitionName() + " instance " +
message.getTgtName()
- + " with CHANGE_ROLE_ONLINE_OFFLINE for " +
message.getResourceName() + " processed");
- }
-
- public void onBecomeDroppedFromOffline(Message message, NotificationContext
context) {
- wait(_clusterManager.sendMessage(
- new MockProtoRequest(MessageType.REMOVE, message.getResourceName(),
- message.getPartitionName(), message.getTgtName(),
UUID.randomUUID().toString(),
- message.getToState(), message.getFromState())));
- System.out.println(
- "Message for " + message.getPartitionName() + " instance " +
message.getTgtName()
- + " with REMOVE for " + message.getResourceName() + " processed");
- }
-
- private void wait(AtomicBoolean completed) {
- _completed = completed;
- while (true) {
- try {
- if (_completed.get()) {
- break;
- }
- Thread.sleep(100);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- }
-}
diff --git
a/helix-gateway/src/main/java/org/apache/helix/gateway/service/HelixGatewayService.java
b/helix-gateway/src/main/java/org/apache/helix/gateway/service/HelixGatewayService.java
index 4f44efcd6..12810f80d 100644
---
a/helix-gateway/src/main/java/org/apache/helix/gateway/service/HelixGatewayService.java
+++
b/helix-gateway/src/main/java/org/apache/helix/gateway/service/HelixGatewayService.java
@@ -2,11 +2,11 @@ package org.apache.helix.gateway.service;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
-
import org.apache.helix.HelixManager;
-import org.apache.helix.HelixManagerFactory;
import org.apache.helix.InstanceType;
-import org.apache.helix.gateway.mock.MockApplication;
+import
org.apache.helix.gateway.statemodel.HelixGatewayOnlineOfflineStateModelFactory;
+import org.apache.helix.manager.zk.ZKHelixManager;
+
public class HelixGatewayService {
final private Map<String, Map<String, HelixManager>> _participantsMap;
@@ -28,15 +28,13 @@ public class HelixGatewayService {
System.out.println("Starting Helix Gateway Service");
}
- public void registerParticipant(MockApplication mockApplication) {
- HelixManager manager =
_participantsMap.computeIfAbsent(mockApplication.getClusterName(),
- k -> new
ConcurrentHashMap<>()).computeIfAbsent(mockApplication.getInstanceName(),
- k ->
HelixManagerFactory.getZKHelixManager(mockApplication.getClusterName(),
- mockApplication.getInstanceName(), InstanceType.PARTICIPANT,
_zkAddress));
- manager.getStateMachineEngine().registerStateModelFactory("OnlineOffline",
- new HelixGatewayOnlineOfflineStateModelFactory(_clusterManager));
+ public void registerParticipant() {
+ // TODO: create participant manager and add to _participantsMap
+ HelixManager manager = new ZKHelixManager("clusterName", "instanceName",
InstanceType.PARTICIPANT, _zkAddress);
+ manager.getStateMachineEngine()
+ .registerStateModelFactory("OnlineOffline", new
HelixGatewayOnlineOfflineStateModelFactory(_clusterManager));
try {
- _clusterManager.addChannel(mockApplication);
+ _clusterManager.addChannel();
manager.connect();
} catch (Exception e) {
throw new RuntimeException(e);
diff --git
a/helix-gateway/src/main/java/org/apache/helix/gateway/statemodel/HelixGatewayOnlineOfflineStateModel.java
b/helix-gateway/src/main/java/org/apache/helix/gateway/statemodel/HelixGatewayOnlineOfflineStateModel.java
new file mode 100644
index 000000000..5c95feb38
--- /dev/null
+++
b/helix-gateway/src/main/java/org/apache/helix/gateway/statemodel/HelixGatewayOnlineOfflineStateModel.java
@@ -0,0 +1,64 @@
+package org.apache.helix.gateway.statemodel;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.helix.NotificationContext;
+import org.apache.helix.gateway.service.ClusterManager;
+import org.apache.helix.model.Message;
+import org.apache.helix.participant.statemachine.StateModel;
+
+public class HelixGatewayOnlineOfflineStateModel extends StateModel {
+ private boolean _firstTime = true;
+ private ClusterManager _clusterManager;
+
+ private String _resourceName;
+ private String _partitionKey;
+
+ private AtomicBoolean _completed;
+
+ public HelixGatewayOnlineOfflineStateModel(String resourceName, String
partitionKey,
+ ClusterManager clusterManager) {
+ _resourceName = resourceName;
+ _partitionKey = partitionKey;
+ _clusterManager = clusterManager;
+ }
+
+ public void onBecomeOnlineFromOffline(Message message, NotificationContext
context) {
+ if (_firstTime) {
+ wait(_clusterManager.sendMessage());
+ System.out.println(
+ "Message for " + message.getPartitionName() + " instance " +
message.getTgtName() + " with ADD for "
+ + message.getResourceName() + " processed");
+ _firstTime = false;
+ }
+ wait(_clusterManager.sendMessage());
+ System.out.println("Message for " + message.getPartitionName() + "
instance " + message.getTgtName()
+ + " with CHANGE_ROLE_OFFLINE_ONLINE for " + message.getResourceName()
+ " processed");
+ }
+
+ public void onBecomeOfflineFromOnline(Message message, NotificationContext
context) {
+ wait(_clusterManager.sendMessage());
+ System.out.println("Message for " + message.getPartitionName() + "
instance " + message.getTgtName()
+ + " with CHANGE_ROLE_ONLINE_OFFLINE for " + message.getResourceName()
+ " processed");
+ }
+
+ public void onBecomeDroppedFromOffline(Message message, NotificationContext
context) {
+ wait(_clusterManager.sendMessage());
+ System.out.println(
+ "Message for " + message.getPartitionName() + " instance " +
message.getTgtName() + " with REMOVE for "
+ + message.getResourceName() + " processed");
+ }
+
+ private void wait(AtomicBoolean completed) {
+ _completed = completed;
+ while (true) {
+ try {
+ if (_completed.get()) {
+ break;
+ }
+ Thread.sleep(100);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+}
diff --git
a/helix-gateway/src/main/java/org/apache/helix/gateway/service/HelixGatewayOnlineOfflineStateModelFactory.java
b/helix-gateway/src/main/java/org/apache/helix/gateway/statemodel/HelixGatewayOnlineOfflineStateModelFactory.java
similarity index 85%
rename from
helix-gateway/src/main/java/org/apache/helix/gateway/service/HelixGatewayOnlineOfflineStateModelFactory.java
rename to
helix-gateway/src/main/java/org/apache/helix/gateway/statemodel/HelixGatewayOnlineOfflineStateModelFactory.java
index 71570ef15..5db789112 100644
---
a/helix-gateway/src/main/java/org/apache/helix/gateway/service/HelixGatewayOnlineOfflineStateModelFactory.java
+++
b/helix-gateway/src/main/java/org/apache/helix/gateway/statemodel/HelixGatewayOnlineOfflineStateModelFactory.java
@@ -1,5 +1,6 @@
-package org.apache.helix.gateway.service;
+package org.apache.helix.gateway.statemodel;
+import org.apache.helix.gateway.service.ClusterManager;
import org.apache.helix.participant.statemachine.StateModelFactory;
public class HelixGatewayOnlineOfflineStateModelFactory extends
StateModelFactory<HelixGatewayOnlineOfflineStateModel> {