participant syncs session id to controller
Project: http://git-wip-us.apache.org/repos/asf/helix/repo Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/2981bbd1 Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/2981bbd1 Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/2981bbd1 Branch: refs/heads/helix-0.6.x Commit: 2981bbd11dc869f5235e85e71f1b30368ef7596e Parents: 9bf4437 Author: Boyan Li <b...@linkedin.com> Authored: Fri Aug 5 11:07:15 2016 -0700 Committer: Lei Xia <l...@linkedin.com> Committed: Sun Feb 5 19:15:16 2017 -0800 ---------------------------------------------------------------------- .../java/org/apache/helix/HelixManager.java | 5 ++ .../apache/helix/manager/zk/ZKHelixManager.java | 23 ++++-- .../messaging/DefaultMessagingService.java | 4 +- .../messaging/handling/HelixTaskExecutor.java | 75 ++++++++++++++--- .../java/org/apache/helix/model/Message.java | 3 +- .../src/test/java/org/apache/helix/Mocks.java | 6 ++ .../controller/stages/DummyClusterManager.java | 7 ++ .../TestSyncSessionToController.java | 84 ++++++++++++++++++++ .../helix/participant/MockZKHelixManager.java | 6 ++ 9 files changed, 194 insertions(+), 19 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/helix/blob/2981bbd1/helix-core/src/main/java/org/apache/helix/HelixManager.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/HelixManager.java b/helix-core/src/main/java/org/apache/helix/HelixManager.java index 25c29e6..7b574aa 100644 --- a/helix-core/src/main/java/org/apache/helix/HelixManager.java +++ b/helix-core/src/main/java/org/apache/helix/HelixManager.java @@ -253,6 +253,11 @@ public interface HelixManager { StateMachineEngine getStateMachineEngine(); /** + * @return the session start time + */ + Long getSessionStartTime(); + + /** * Check if the cluster manager is the leader * @return true if this is a controller and a leader of the cluster */ http://git-wip-us.apache.org/repos/asf/helix/blob/2981bbd1/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java index bef2eac..f9d03c3 100644 --- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java +++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java @@ -79,7 +79,7 @@ import org.apache.zookeeper.ZooKeeper.States; public class ZKHelixManager implements HelixManager, IZkStateListener { private static Logger LOG = Logger.getLogger(ZKHelixManager.class); - public static final int FLAPPING_TIME_WINDIOW = 300000; // Default to 300 sec + public static final int FLAPPING_TIME_WINDOW = 300000; // Default to 300 sec public static final int MAX_DISCONNECT_THRESHOLD = 5; public static final String ALLOW_PARTICIPANT_AUTO_JOIN = "allowParticipantAutoJoin"; @@ -125,6 +125,7 @@ public class ZKHelixManager implements HelixManager, IZkStateListener { private final List<HelixTimerTask> _timerTasks = new ArrayList<HelixTimerTask>(); private final ParticipantHealthReportCollectorImpl _participantHealthInfoCollector; + private Long _sessionStartTime; /** * controller fields @@ -206,7 +207,7 @@ public class ZKHelixManager implements HelixManager, IZkStateListener { */ _flappingTimeWindowMs = getSystemPropertyAsInt("helixmanager.flappingTimeWindow", - ZKHelixManager.FLAPPING_TIME_WINDIOW); + ZKHelixManager.FLAPPING_TIME_WINDOW); _maxDisconnectThreshold = getSystemPropertyAsInt("helixmanager.maxDisconnectThreshold", @@ -326,9 +327,7 @@ public class ZKHelixManager implements HelixManager, IZkStateListener { @Override public void addIdealStateChangeListener(final IdealStateChangeListener listener) throws Exception { addListener(listener, new Builder(_clusterName).idealStates(), ChangeType.IDEAL_STATE, - new EventType[] { - EventType.NodeDataChanged, EventType.NodeDeleted, EventType.NodeCreated - }); + new EventType[]{EventType.NodeDataChanged, EventType.NodeDeleted, EventType.NodeCreated}); } @Override @@ -510,8 +509,8 @@ public class ZKHelixManager implements HelixManager, IZkStateListener { public void connect() throws Exception { LOG.info("ClusterManager.connect()"); if (isConnected()) { - LOG.warn("Cluster manager: " + _instanceName + " for cluster: " + _clusterName - + " already connected. skip connect"); + LOG.warn( + "Cluster manager: " + _instanceName + " for cluster: " + _clusterName + " already connected. skip connect"); return; } @@ -520,6 +519,7 @@ public class ZKHelixManager implements HelixManager, IZkStateListener { case CONTROLLER_PARTICIPANT: if (_controller == null) { _controller = new GenericHelixController(); + _messagingService.getExecutor().setController(_controller); } break; default: @@ -567,6 +567,7 @@ public class ZKHelixManager implements HelixManager, IZkStateListener { } finally { _zkclient.close(); _zkclient = null; + _sessionStartTime = null; LOG.info("Cluster manager: " + _instanceName + " disconnected"); if (_controller != null) { @@ -903,6 +904,8 @@ public class ZKHelixManager implements HelixManager, IZkStateListener { * setup message listener */ participantHelper.setupMsgHandler(); + + _sessionStartTime = System.currentTimeMillis(); } void handleNewSessionAsController() { @@ -927,4 +930,10 @@ public class ZKHelixManager implements HelixManager, IZkStateListener { @Override public void handleSessionEstablishmentError(Throwable var1) throws Exception { } + + @Override + public Long getSessionStartTime() { + return _sessionStartTime; + } + } http://git-wip-us.apache.org/repos/asf/helix/blob/2981bbd1/helix-core/src/main/java/org/apache/helix/messaging/DefaultMessagingService.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/messaging/DefaultMessagingService.java b/helix-core/src/main/java/org/apache/helix/messaging/DefaultMessagingService.java index 1121689..f000f69 100644 --- a/helix-core/src/main/java/org/apache/helix/messaging/DefaultMessagingService.java +++ b/helix-core/src/main/java/org/apache/helix/messaging/DefaultMessagingService.java @@ -56,7 +56,7 @@ public class DefaultMessagingService implements ClusterMessagingService { public DefaultMessagingService(HelixManager manager) { _manager = manager; _evaluator = new CriteriaEvaluator(); - _taskExecutor = new HelixTaskExecutor(); + _taskExecutor = new HelixTaskExecutor(this); _asyncCallbackService = new AsyncCallbackService(); _taskExecutor.registerMessageHandlerFactory(MessageType.TASK_REPLY.toString(), _asyncCallbackService); @@ -187,7 +187,7 @@ public class DefaultMessagingService implements ClusterMessagingService { private List<Message> generateMessagesForController(Message message) { List<Message> messages = new ArrayList<Message>(); - String id = UUID.randomUUID().toString(); + String id = (message.getMsgId() == null) ? UUID.randomUUID().toString() : message.getMsgId(); Message newMessage = new Message(message.getRecord(), id); newMessage.setMsgId(id); newMessage.setSrcName(_manager.getInstanceName()); http://git-wip-us.apache.org/repos/asf/helix/blob/2981bbd1/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java index bc536fd..0431770 100644 --- a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java +++ b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java @@ -34,20 +34,25 @@ import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; +import org.apache.helix.ClusterMessagingService; import org.apache.helix.ConfigAccessor; +import org.apache.helix.Criteria; import org.apache.helix.HelixConstants; import org.apache.helix.HelixDataAccessor; import org.apache.helix.HelixException; import org.apache.helix.HelixManager; +import org.apache.helix.InstanceType; import org.apache.helix.MessageListener; import org.apache.helix.NotificationContext; import org.apache.helix.NotificationContext.MapKey; import org.apache.helix.NotificationContext.Type; import org.apache.helix.PropertyKey; import org.apache.helix.PropertyKey.Builder; +import org.apache.helix.controller.GenericHelixController; import org.apache.helix.model.CurrentState; import org.apache.helix.model.HelixConfigScope; import org.apache.helix.model.HelixConfigScope.ConfigScopeProperty; +import org.apache.helix.model.LiveInstance; import org.apache.helix.model.Message; import org.apache.helix.model.Message.MessageState; import org.apache.helix.model.Message.MessageType; @@ -104,7 +109,11 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor { public static final String MAX_THREADS = "maxThreads"; private MessageQueueMonitor _messageQueueMonitor; - + private ClusterMessagingService _messagingService; + private GenericHelixController _controller; + private Long _lastSessionSyncTime; + private static final int SESSION_SYNC_INTERVAL = 2000; // 2 seconds + private static final String SESSION_SYNC = "SESSION-SYNC"; /** * Map of MsgType->MsgHandlerFactoryRegistryItem */ @@ -135,6 +144,11 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor { startMonitorThread(); } + public HelixTaskExecutor(ClusterMessagingService messagingService) { + this(); + _messagingService = messagingService; + } + @Override public void registerMessageHandlerFactory(String type, MessageHandlerFactory factory) { registerMessageHandlerFactory(type, factory, DEFAULT_PARALLEL_TASKS); @@ -170,6 +184,10 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor { } } + public void setController(GenericHelixController controller) { + _controller = controller; + } + public ParticipantMonitor getParticipantMonitor() { return _monitor; } @@ -386,8 +404,7 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor { _taskMap.remove(taskId); return true; } else { - _statusUpdateUtil.logInfo(message, HelixTaskExecutor.class, - "fail to cancel task: " + taskId, + _statusUpdateUtil.logInfo(message, HelixTaskExecutor.class, "fail to cancel task: " + taskId, notificationContext.getManager().getHelixDataAccessor()); } } else { @@ -403,8 +420,7 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor { public void finishTask(MessageTask task) { Message message = task.getMessage(); String taskId = task.getTaskId(); - LOG.info("message finished: " + taskId + ", took " + (new Date().getTime() - message - .getExecuteStartTimeStamp())); + LOG.info("message finished: " + taskId + ", took " + (new Date().getTime() - message.getExecuteStartTimeStamp())); synchronized (_lock) { if (_taskMap.containsKey(taskId)) { @@ -471,8 +487,8 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor { item.factory().reset(); } - LOG.info("Unregistered message handler factory for type: " + type + ", factory: " - + item.factory() + ", pool: " + pool); + LOG.info( + "Unregistered message handler factory for type: " + type + ", factory: " + item.factory() + ", pool: " + pool); } void reset() { @@ -502,6 +518,8 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor { LOG.warn("Task: " + taskId + " fails to terminate. Message: " + info._task.getMessage()); } _taskMap.clear(); + + _lastSessionSyncTime = null; } void init() { @@ -526,6 +544,28 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor { } } + private void syncSessionToController(HelixManager manager) { + if (_lastSessionSyncTime == null || + System.currentTimeMillis() - _lastSessionSyncTime > SESSION_SYNC_INTERVAL) { // > delay since last sync + HelixDataAccessor accessor = manager.getHelixDataAccessor(); + PropertyKey key = new Builder(manager.getClusterName()).controllerMessage(SESSION_SYNC); + if (accessor.getProperty(key) == null) { + Message msg = new Message(MessageType.PARTICIPANT_SESSION_CHANGE, SESSION_SYNC); + msg.setSrcName(manager.getInstanceName()); + msg.setTgtSessionId("*"); + msg.setMsgState(MessageState.NEW); + msg.setMsgId(SESSION_SYNC); + + Criteria cr = new Criteria(); + cr.setRecipientInstanceType(InstanceType.CONTROLLER); + cr.setSessionSpecific(false); + + manager.getMessagingService().send(cr, msg); + _lastSessionSyncTime = System.currentTimeMillis(); + } + } + } + @Override public void onMessage(String instanceName, List<Message> messages, NotificationContext changeContext) { @@ -596,8 +636,25 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor { + message.getMsgId(); LOG.warn(warningMessage); accessor.removeProperty(message.getKey(keyBuilder, instanceName)); - _statusUpdateUtil.logWarning(message, HelixStateMachineEngine.class, warningMessage, - accessor); + _statusUpdateUtil.logWarning(message, HelixStateMachineEngine.class, warningMessage, accessor); + + // Proactively send a session sync message from participant to controller + // upon session mismatch after a new session is established + if (manager.getInstanceType() == InstanceType.PARTICIPANT + || manager.getInstanceType() == InstanceType.CONTROLLER_PARTICIPANT) { + if (message.getCreateTimeStamp() > manager.getSessionStartTime()) { + syncSessionToController(manager); + } + } + continue; + } + + if ((manager.getInstanceType() == InstanceType.CONTROLLER || manager.getInstanceType() == InstanceType.CONTROLLER_PARTICIPANT) + && MessageType.PARTICIPANT_SESSION_CHANGE.name().equals(message.getMsgType())) { + PropertyKey key = new Builder(manager.getClusterName()).liveInstances(); + List<LiveInstance> liveInstances = manager.getHelixDataAccessor().getChildValues(key); + _controller.onLiveInstanceChange(liveInstances, changeContext); + accessor.removeProperty(message.getKey(keyBuilder, instanceName)); continue; } http://git-wip-us.apache.org/repos/asf/helix/blob/2981bbd1/helix-core/src/main/java/org/apache/helix/model/Message.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/model/Message.java b/helix-core/src/main/java/org/apache/helix/model/Message.java index 31f50ab..2901b1c 100644 --- a/helix-core/src/main/java/org/apache/helix/model/Message.java +++ b/helix-core/src/main/java/org/apache/helix/model/Message.java @@ -48,7 +48,8 @@ public class Message extends HelixProperty { CONTROLLER_MSG, TASK_REPLY, NO_OP, - PARTICIPANT_ERROR_REPORT + PARTICIPANT_ERROR_REPORT, + PARTICIPANT_SESSION_CHANGE }; /** http://git-wip-us.apache.org/repos/asf/helix/blob/2981bbd1/helix-core/src/test/java/org/apache/helix/Mocks.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/Mocks.java b/helix-core/src/test/java/org/apache/helix/Mocks.java index 7d27693..9688c0d 100644 --- a/helix-core/src/test/java/org/apache/helix/Mocks.java +++ b/helix-core/src/test/java/org/apache/helix/Mocks.java @@ -31,6 +31,7 @@ import org.I0Itec.zkclient.DataUpdater; import org.I0Itec.zkclient.IZkChildListener; import org.I0Itec.zkclient.IZkDataListener; import org.apache.helix.PropertyKey.Builder; +import org.apache.helix.controller.GenericHelixController; import org.apache.helix.healthcheck.ParticipantHealthReportCollector; import org.apache.helix.messaging.AsyncCallback; import org.apache.helix.messaging.handling.HelixTaskExecutor; @@ -456,6 +457,11 @@ public class Mocks { return null; } + @Override + public Long getSessionStartTime() { + return 0L; + } + } public static class MockAccessor implements HelixDataAccessor { http://git-wip-us.apache.org/repos/asf/helix/blob/2981bbd1/helix-core/src/test/java/org/apache/helix/controller/stages/DummyClusterManager.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/DummyClusterManager.java b/helix-core/src/test/java/org/apache/helix/controller/stages/DummyClusterManager.java index b54ddcc..0167487 100644 --- a/helix-core/src/test/java/org/apache/helix/controller/stages/DummyClusterManager.java +++ b/helix-core/src/test/java/org/apache/helix/controller/stages/DummyClusterManager.java @@ -39,6 +39,7 @@ import org.apache.helix.PreConnectCallback; import org.apache.helix.PropertyKey; import org.apache.helix.ScopedConfigChangeListener; import org.apache.helix.ZNRecord; +import org.apache.helix.controller.GenericHelixController; import org.apache.helix.healthcheck.ParticipantHealthReportCollector; import org.apache.helix.model.HelixConfigScope.ConfigScopeProperty; import org.apache.helix.participant.StateMachineEngine; @@ -252,4 +253,10 @@ public class DummyClusterManager implements HelixManager { // TODO Auto-generated method stub return null; } + + @Override + public Long getSessionStartTime() { + return 0L; + } + } http://git-wip-us.apache.org/repos/asf/helix/blob/2981bbd1/helix-core/src/test/java/org/apache/helix/integration/TestSyncSessionToController.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestSyncSessionToController.java b/helix-core/src/test/java/org/apache/helix/integration/TestSyncSessionToController.java new file mode 100644 index 0000000..36ecf24 --- /dev/null +++ b/helix-core/src/test/java/org/apache/helix/integration/TestSyncSessionToController.java @@ -0,0 +1,84 @@ +package org.apache.helix.integration; + +import java.util.Date; +import java.util.List; + +import org.apache.helix.InstanceType; +import org.apache.helix.MessageListener; +import org.apache.helix.NotificationContext; +import org.apache.helix.PropertyKey; +import org.apache.helix.TestHelper; +import org.apache.helix.ZNRecord; +import org.apache.helix.integration.manager.ClusterControllerManager; +import org.apache.helix.integration.manager.MockParticipantManager; +import org.apache.helix.manager.zk.ZKHelixManager; +import org.apache.helix.manager.zk.ZkBaseDataAccessor; +import org.apache.helix.model.Message; +import org.apache.zookeeper.data.Stat; +import org.testng.Assert; +import org.testng.annotations.Test; + + +public class TestSyncSessionToController extends ZkIntegrationTestBase { + @Test + public void testSyncSessionToController() throws Exception { + System.out.println("START testSyncSessionToController at " + new Date(System.currentTimeMillis())); + + String clusterName = getShortClassName(); + MockParticipantManager[] participants = new MockParticipantManager[5]; + int resourceNb = 10; + TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port + "localhost", // participant name prefix + "TestDB", // resource name prefix + resourceNb, // resources + 1, // partitions per resource + 5, // number of nodes + 1, // replicas + "MasterSlave", true); // do rebalance + + ClusterControllerManager controller = + new ClusterControllerManager(ZK_ADDR, clusterName, "controller_0"); + controller.syncStart(); + + // start participants + for (int i = 0; i < 5; i++) { + String instanceName = "localhost_" + (12918 + i); + + participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, instanceName); + participants[i].syncStart(); + } + + ZKHelixManager zkHelixManager = new ZKHelixManager(clusterName, "controllerMessageListener", InstanceType.CONTROLLER, ZK_ADDR); + zkHelixManager.connect(); + MockMessageListener mockMessageListener = new MockMessageListener(); + zkHelixManager.addControllerMessageListener(mockMessageListener); + + PropertyKey.Builder keyBuilder = new PropertyKey.Builder(clusterName); + ZkBaseDataAccessor<ZNRecord> accessor = new ZkBaseDataAccessor<ZNRecord>(_gZkClient); + String path = keyBuilder.liveInstance("localhost_12918").getPath(); + Stat stat = new Stat(); + ZNRecord data = accessor.get(path, stat, 2); + data.getSimpleFields().put("SESSION_ID", "invalid-id"); + accessor.set(path, data, 2); + Thread.sleep(2000); + + Assert.assertTrue(mockMessageListener.isSessionSyncMessageSent()); + } + + class MockMessageListener implements MessageListener { + private boolean sessionSyncMessageSent = false; + + @Override + public void onMessage(String instanceName, List<Message> messages, NotificationContext changeContext) { + for (Message message: messages) { + if (message.getMsgId().equals("SESSION-SYNC")) { + sessionSyncMessageSent = true; + } + } + } + + public boolean isSessionSyncMessageSent() { + return sessionSyncMessageSent; + } + } +} http://git-wip-us.apache.org/repos/asf/helix/blob/2981bbd1/helix-core/src/test/java/org/apache/helix/participant/MockZKHelixManager.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/participant/MockZKHelixManager.java b/helix-core/src/test/java/org/apache/helix/participant/MockZKHelixManager.java index 2b6f757..db6974e 100644 --- a/helix-core/src/test/java/org/apache/helix/participant/MockZKHelixManager.java +++ b/helix-core/src/test/java/org/apache/helix/participant/MockZKHelixManager.java @@ -41,6 +41,7 @@ import org.apache.helix.PreConnectCallback; import org.apache.helix.PropertyKey; import org.apache.helix.ScopedConfigChangeListener; import org.apache.helix.ZNRecord; +import org.apache.helix.controller.GenericHelixController; import org.apache.helix.healthcheck.ParticipantHealthReportCollector; import org.apache.helix.manager.zk.ZKHelixDataAccessor; import org.apache.helix.manager.zk.ZkBaseDataAccessor; @@ -259,4 +260,9 @@ public class MockZKHelixManager implements HelixManager { return null; } + @Override + public Long getSessionStartTime() { + return 0L; + } + }