Repository: asterixdb Updated Branches: refs/heads/master 8a7894f58 -> 893d385f7
[NO ISSUE][TX] Ensure TxnIdFactory Value is Initialized - user model changes: no - storage format changes: no - interface changes: no Details: - Report local max txn id after node registration. - Add node status BOOTING. - Distinguish between node first time registration and registration after restarting by using NodeStatus BOOTING to respond with the proper node post registration tasks. - Rename node status ALIVE -> ACTIVE. - Rename StartupTask* to RegistrationTasks* Change-Id: I6899c9e7d6e744ca92d0108556e086a23639d78b Reviewed-on: https://asterix-gerrit.ics.uci.edu/2151 Reviewed-by: Michael Blow <[email protected]> Tested-by: Michael Blow <[email protected]> Sonar-Qube: Jenkins <[email protected]> Integration-Tests: Jenkins <[email protected]> Tested-by: Jenkins <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/893d385f Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/893d385f Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/893d385f Branch: refs/heads/master Commit: 893d385f7132d002fb3219d1e55e03836920d61a Parents: 8a7894f Author: Murtadha Hubail <[email protected]> Authored: Wed Nov 15 02:35:03 2017 +0300 Committer: Murtadha Hubail <[email protected]> Committed: Wed Nov 15 02:31:21 2017 -0800 ---------------------------------------------------------------------- .../asterix/app/nc/NCAppRuntimeContext.java | 4 +- .../app/nc/task/ReportLocalCountersTask.java | 40 +++++++++ .../app/nc/task/ReportMaxResourceIdTask.java | 40 --------- .../replication/AutoFaultToleranceStrategy.java | 20 ++--- .../MetadataNodeFaultToleranceStrategy.java | 20 ++--- .../replication/NoFaultToleranceStrategy.java | 46 +++++++--- .../message/NCLifecycleTaskReportMessage.java | 2 +- .../RegistrationTasksRequestMessage.java | 81 +++++++++++++++++ .../RegistrationTasksResponseMessage.java | 93 ++++++++++++++++++++ .../message/StartupTaskRequestMessage.java | 71 --------------- .../message/StartupTaskResponseMessage.java | 93 -------------------- .../hyracks/bootstrap/NCApplication.java | 30 ++++--- .../common/replication/INCLifecycleMessage.java | 8 +- .../message/ReportLocalCountersMessage.java | 74 ++++++++++++++++ .../ReportLocalCountersRequestMessage.java | 38 ++++++++ .../message/ReportMaxResourceIdMessage.java | 72 --------------- .../ReportMaxResourceIdRequestMessage.java | 38 -------- .../message/ResourceIdRequestMessage.java | 2 +- .../service/transaction/TxnIdFactory.java | 2 +- .../apache/hyracks/api/client/NodeStatus.java | 3 +- .../hyracks/control/cc/cluster/NodeManager.java | 2 +- .../hyracks/hdfs/scheduler/SchedulerTest.java | 4 +- .../apache/hyracks/test/support/TestUtils.java | 2 +- 23 files changed, 413 insertions(+), 372 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/asterixdb/blob/893d385f/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java index 7b08f68..e77d535 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java @@ -481,7 +481,9 @@ public class NCAppRuntimeContext implements INcApplicationContext { @Override public synchronized void unexportMetadataNodeStub() throws RemoteException { - UnicastRemoteObject.unexportObject(MetadataNode.INSTANCE, false); + if (metadataNodeStub != null) { + UnicastRemoteObject.unexportObject(MetadataNode.INSTANCE, false); + } metadataNodeStub = null; } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/893d385f/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/ReportLocalCountersTask.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/ReportLocalCountersTask.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/ReportLocalCountersTask.java new file mode 100644 index 0000000..86f7d1c --- /dev/null +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/ReportLocalCountersTask.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.app.nc.task; + +import org.apache.asterix.common.api.INCLifecycleTask; +import org.apache.asterix.runtime.message.ReportLocalCountersMessage; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.api.service.IControllerService; +import org.apache.hyracks.control.nc.NodeControllerService; + +public class ReportLocalCountersTask implements INCLifecycleTask { + + private static final long serialVersionUID = 1L; + + @Override + public void perform(IControllerService cs) throws HyracksDataException { + ReportLocalCountersMessage.send((NodeControllerService) cs); + } + + @Override + public String toString() { + return "{ \"class\" : \"" + getClass().getSimpleName() + "\" }"; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/asterixdb/blob/893d385f/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/ReportMaxResourceIdTask.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/ReportMaxResourceIdTask.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/ReportMaxResourceIdTask.java deleted file mode 100644 index 22d3cde..0000000 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/ReportMaxResourceIdTask.java +++ /dev/null @@ -1,40 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.asterix.app.nc.task; - -import org.apache.asterix.common.api.INCLifecycleTask; -import org.apache.asterix.runtime.message.ReportMaxResourceIdMessage; -import org.apache.hyracks.api.exceptions.HyracksDataException; -import org.apache.hyracks.api.service.IControllerService; -import org.apache.hyracks.control.nc.NodeControllerService; - -public class ReportMaxResourceIdTask implements INCLifecycleTask { - - private static final long serialVersionUID = 1L; - - @Override - public void perform(IControllerService cs) throws HyracksDataException { - ReportMaxResourceIdMessage.send((NodeControllerService) cs); - } - - @Override - public String toString() { - return "{ \"class\" : \"" + getClass().getSimpleName() + "\" }"; - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/asterixdb/blob/893d385f/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/AutoFaultToleranceStrategy.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/AutoFaultToleranceStrategy.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/AutoFaultToleranceStrategy.java index 4ac1305..23f225e 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/AutoFaultToleranceStrategy.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/AutoFaultToleranceStrategy.java @@ -33,7 +33,7 @@ import org.apache.asterix.app.nc.task.BindMetadataNodeTask; import org.apache.asterix.app.nc.task.CheckpointTask; import org.apache.asterix.app.nc.task.ExternalLibrarySetupTask; import org.apache.asterix.app.nc.task.MetadataBootstrapTask; -import org.apache.asterix.app.nc.task.ReportMaxResourceIdTask; +import org.apache.asterix.app.nc.task.ReportLocalCountersTask; import org.apache.asterix.app.nc.task.StartFailbackTask; import org.apache.asterix.app.nc.task.StartLifecycleComponentsTask; import org.apache.asterix.app.nc.task.StartReplicationServiceTask; @@ -43,8 +43,8 @@ import org.apache.asterix.app.replication.message.CompleteFailbackResponseMessag import org.apache.asterix.app.replication.message.NCLifecycleTaskReportMessage; import org.apache.asterix.app.replication.message.PreparePartitionsFailbackRequestMessage; import org.apache.asterix.app.replication.message.PreparePartitionsFailbackResponseMessage; -import org.apache.asterix.app.replication.message.StartupTaskRequestMessage; -import org.apache.asterix.app.replication.message.StartupTaskResponseMessage; +import org.apache.asterix.app.replication.message.RegistrationTasksRequestMessage; +import org.apache.asterix.app.replication.message.RegistrationTasksResponseMessage; import org.apache.asterix.app.replication.message.TakeoverMetadataNodeRequestMessage; import org.apache.asterix.app.replication.message.TakeoverMetadataNodeResponseMessage; import org.apache.asterix.app.replication.message.TakeoverPartitionsRequestMessage; @@ -431,10 +431,10 @@ public class AutoFaultToleranceStrategy implements IFaultToleranceStrategy { @Override public synchronized void process(INCLifecycleMessage message) throws HyracksDataException { switch (message.getType()) { - case STARTUP_TASK_REQUEST: - process((StartupTaskRequestMessage) message); + case REGISTRATION_TASKS_REQUEST: + process((RegistrationTasksRequestMessage) message); break; - case STARTUP_TASK_RESULT: + case REGISTRATION_TASKS_RESULT: process((NCLifecycleTaskReportMessage) message); break; case TAKEOVER_PARTITION_RESPONSE: @@ -483,7 +483,7 @@ public class AutoFaultToleranceStrategy implements IFaultToleranceStrategy { currentMetadataNode = clusterManager.getCurrentMetadataNodeId(); } - private synchronized void process(StartupTaskRequestMessage msg) throws HyracksDataException { + private synchronized void process(RegistrationTasksRequestMessage msg) throws HyracksDataException { final String nodeId = msg.getNodeId(); final SystemState state = msg.getState(); List<INCLifecycleTask> tasks; @@ -493,7 +493,7 @@ public class AutoFaultToleranceStrategy implements IFaultToleranceStrategy { // failed node returned. Need to start failback process tasks = buildFailbackStartupSequence(); } - StartupTaskResponseMessage response = new StartupTaskResponseMessage(nodeId, tasks); + RegistrationTasksResponseMessage response = new RegistrationTasksResponseMessage(nodeId, tasks); try { messageBroker.sendApplicationMessageToNC(response, msg.getNodeId()); } catch (Exception e) { @@ -504,7 +504,7 @@ public class AutoFaultToleranceStrategy implements IFaultToleranceStrategy { private List<INCLifecycleTask> buildFailbackStartupSequence() { final List<INCLifecycleTask> tasks = new ArrayList<>(); tasks.add(new StartFailbackTask()); - tasks.add(new ReportMaxResourceIdTask()); + tasks.add(new ReportLocalCountersTask()); tasks.add(new StartLifecycleComponentsTask()); return tasks; } @@ -517,7 +517,7 @@ public class AutoFaultToleranceStrategy implements IFaultToleranceStrategy { tasks.add(new MetadataBootstrapTask()); } tasks.add(new ExternalLibrarySetupTask(isMetadataNode)); - tasks.add(new ReportMaxResourceIdTask()); + tasks.add(new ReportLocalCountersTask()); tasks.add(new CheckpointTask()); tasks.add(new StartLifecycleComponentsTask()); if (isMetadataNode) { http://git-wip-us.apache.org/repos/asf/asterixdb/blob/893d385f/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/MetadataNodeFaultToleranceStrategy.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/MetadataNodeFaultToleranceStrategy.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/MetadataNodeFaultToleranceStrategy.java index 1b57403..3341813 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/MetadataNodeFaultToleranceStrategy.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/MetadataNodeFaultToleranceStrategy.java @@ -35,14 +35,14 @@ import org.apache.asterix.app.nc.task.ExternalLibrarySetupTask; import org.apache.asterix.app.nc.task.LocalRecoveryTask; import org.apache.asterix.app.nc.task.MetadataBootstrapTask; import org.apache.asterix.app.nc.task.RemoteRecoveryTask; -import org.apache.asterix.app.nc.task.ReportMaxResourceIdTask; +import org.apache.asterix.app.nc.task.ReportLocalCountersTask; import org.apache.asterix.app.nc.task.StartLifecycleComponentsTask; import org.apache.asterix.app.nc.task.StartReplicationServiceTask; import org.apache.asterix.app.replication.message.NCLifecycleTaskReportMessage; +import org.apache.asterix.app.replication.message.RegistrationTasksRequestMessage; import org.apache.asterix.app.replication.message.ReplayPartitionLogsRequestMessage; import org.apache.asterix.app.replication.message.ReplayPartitionLogsResponseMessage; -import org.apache.asterix.app.replication.message.StartupTaskRequestMessage; -import org.apache.asterix.app.replication.message.StartupTaskResponseMessage; +import org.apache.asterix.app.replication.message.RegistrationTasksResponseMessage; import org.apache.asterix.common.api.INCLifecycleTask; import org.apache.asterix.common.cluster.ClusterPartition; import org.apache.asterix.common.cluster.IClusterStateManager; @@ -123,10 +123,10 @@ public class MetadataNodeFaultToleranceStrategy implements IFaultToleranceStrate @Override public synchronized void process(INCLifecycleMessage message) throws HyracksDataException { switch (message.getType()) { - case STARTUP_TASK_REQUEST: - process((StartupTaskRequestMessage) message); + case REGISTRATION_TASKS_REQUEST: + process((RegistrationTasksRequestMessage) message); break; - case STARTUP_TASK_RESULT: + case REGISTRATION_TASKS_RESULT: process((NCLifecycleTaskReportMessage) message); break; case REPLAY_LOGS_RESPONSE: @@ -150,7 +150,7 @@ public class MetadataNodeFaultToleranceStrategy implements IFaultToleranceStrate } } - private synchronized void process(StartupTaskRequestMessage msg) throws HyracksDataException { + private synchronized void process(RegistrationTasksRequestMessage msg) throws HyracksDataException { final String nodeId = msg.getNodeId(); final SystemState state = msg.getState(); final boolean isParticipant = replicationStrategy.isParticipant(nodeId); @@ -160,7 +160,7 @@ public class MetadataNodeFaultToleranceStrategy implements IFaultToleranceStrate } else { tasks = buildParticipantStartupSequence(nodeId, state); } - StartupTaskResponseMessage response = new StartupTaskResponseMessage(nodeId, tasks); + RegistrationTasksResponseMessage response = new RegistrationTasksResponseMessage(nodeId, tasks); try { messageBroker.sendApplicationMessageToNC(response, msg.getNodeId()); } catch (Exception e) { @@ -199,7 +199,7 @@ public class MetadataNodeFaultToleranceStrategy implements IFaultToleranceStrate tasks.add(rt); } tasks.add(new ExternalLibrarySetupTask(false)); - tasks.add(new ReportMaxResourceIdTask()); + tasks.add(new ReportLocalCountersTask()); tasks.add(new CheckpointTask()); tasks.add(new StartLifecycleComponentsTask()); return tasks; @@ -234,7 +234,7 @@ public class MetadataNodeFaultToleranceStrategy implements IFaultToleranceStrate tasks.add(new MetadataBootstrapTask()); } tasks.add(new ExternalLibrarySetupTask(isMetadataNode)); - tasks.add(new ReportMaxResourceIdTask()); + tasks.add(new ReportLocalCountersTask()); tasks.add(new CheckpointTask()); tasks.add(new StartLifecycleComponentsTask()); if (isMetadataNode) { http://git-wip-us.apache.org/repos/asf/asterixdb/blob/893d385f/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NoFaultToleranceStrategy.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NoFaultToleranceStrategy.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NoFaultToleranceStrategy.java index b9ea135..a273845 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NoFaultToleranceStrategy.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NoFaultToleranceStrategy.java @@ -32,11 +32,11 @@ import org.apache.asterix.app.nc.task.CheckpointTask; import org.apache.asterix.app.nc.task.ExternalLibrarySetupTask; import org.apache.asterix.app.nc.task.LocalRecoveryTask; import org.apache.asterix.app.nc.task.MetadataBootstrapTask; -import org.apache.asterix.app.nc.task.ReportMaxResourceIdTask; +import org.apache.asterix.app.nc.task.ReportLocalCountersTask; import org.apache.asterix.app.nc.task.StartLifecycleComponentsTask; import org.apache.asterix.app.replication.message.NCLifecycleTaskReportMessage; -import org.apache.asterix.app.replication.message.StartupTaskRequestMessage; -import org.apache.asterix.app.replication.message.StartupTaskResponseMessage; +import org.apache.asterix.app.replication.message.RegistrationTasksRequestMessage; +import org.apache.asterix.app.replication.message.RegistrationTasksResponseMessage; import org.apache.asterix.common.api.INCLifecycleTask; import org.apache.asterix.common.cluster.ClusterPartition; import org.apache.asterix.common.cluster.IClusterStateManager; @@ -48,12 +48,13 @@ import org.apache.asterix.common.replication.INCLifecycleMessage; import org.apache.asterix.common.replication.IReplicationStrategy; import org.apache.asterix.common.transactions.IRecoveryManager.SystemState; import org.apache.hyracks.api.application.ICCServiceContext; +import org.apache.hyracks.api.client.NodeStatus; import org.apache.hyracks.api.exceptions.HyracksDataException; public class NoFaultToleranceStrategy implements IFaultToleranceStrategy { private static final Logger LOGGER = Logger.getLogger(NoFaultToleranceStrategy.class.getName()); - IClusterStateManager clusterManager; + private IClusterStateManager clusterManager; private String metadataNodeId; private Set<String> pendingStartupCompletionNodes = new HashSet<>(); private ICCMessageBroker messageBroker; @@ -76,10 +77,10 @@ public class NoFaultToleranceStrategy implements IFaultToleranceStrategy { @Override public void process(INCLifecycleMessage message) throws HyracksDataException { switch (message.getType()) { - case STARTUP_TASK_REQUEST: - process((StartupTaskRequestMessage) message); + case REGISTRATION_TASKS_REQUEST: + process((RegistrationTasksRequestMessage) message); break; - case STARTUP_TASK_RESULT: + case REGISTRATION_TASKS_RESULT: process((NCLifecycleTaskReportMessage) message); break; default: @@ -100,10 +101,10 @@ public class NoFaultToleranceStrategy implements IFaultToleranceStrategy { metadataNodeId = clusterManager.getCurrentMetadataNodeId(); } - private void process(StartupTaskRequestMessage msg) throws HyracksDataException { + private void process(RegistrationTasksRequestMessage msg) throws HyracksDataException { final String nodeId = msg.getNodeId(); - List<INCLifecycleTask> tasks = buildNCStartupSequence(msg.getNodeId(), msg.getState()); - StartupTaskResponseMessage response = new StartupTaskResponseMessage(nodeId, tasks); + List<INCLifecycleTask> tasks = buildNCRegTasks(msg.getNodeId(), msg.getNodeStatus(), msg.getState()); + RegistrationTasksResponseMessage response = new RegistrationTasksResponseMessage(nodeId, tasks); try { messageBroker.sendApplicationMessageToNC(response, msg.getNodeId()); } catch (Exception e) { @@ -126,7 +127,16 @@ public class NoFaultToleranceStrategy implements IFaultToleranceStrategy { } } - private List<INCLifecycleTask> buildNCStartupSequence(String nodeId, SystemState state) { + private List<INCLifecycleTask> buildNCRegTasks(String nodeId, NodeStatus nodeStatus, SystemState state) { + LOGGER.log(Level.INFO, () -> "Building registration tasks for node: " + nodeId + " with state: " + state); + final boolean isMetadataNode = nodeId.equals(metadataNodeId); + if (nodeStatus == NodeStatus.ACTIVE) { + /* + * if the node state is already ACTIVE then it completed + * booting and just re-registering with a new/failed CC. + */ + return buildActiveNCRegTasks(isMetadataNode); + } final List<INCLifecycleTask> tasks = new ArrayList<>(); if (state == SystemState.CORRUPTED) { //need to perform local recovery for node partitions @@ -134,12 +144,11 @@ public class NoFaultToleranceStrategy implements IFaultToleranceStrategy { .stream().map(ClusterPartition::getPartitionId).collect(Collectors.toSet())); tasks.add(rt); } - final boolean isMetadataNode = nodeId.equals(metadataNodeId); if (isMetadataNode) { tasks.add(new MetadataBootstrapTask()); } tasks.add(new ExternalLibrarySetupTask(isMetadataNode)); - tasks.add(new ReportMaxResourceIdTask()); + tasks.add(new ReportLocalCountersTask()); tasks.add(new CheckpointTask()); tasks.add(new StartLifecycleComponentsTask()); if (isMetadataNode) { @@ -147,4 +156,15 @@ public class NoFaultToleranceStrategy implements IFaultToleranceStrategy { } return tasks; } + + private List<INCLifecycleTask> buildActiveNCRegTasks(boolean metadataNode) { + final List<INCLifecycleTask> tasks = new ArrayList<>(); + if (metadataNode) { + // need to unbind from old distributed state then rebind to new one + tasks.add(new BindMetadataNodeTask(false)); + tasks.add(new BindMetadataNodeTask(true)); + } + tasks.add(new ReportLocalCountersTask()); + return tasks; + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/asterixdb/blob/893d385f/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/NCLifecycleTaskReportMessage.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/NCLifecycleTaskReportMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/NCLifecycleTaskReportMessage.java index 2b32e1f..b654fd8 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/NCLifecycleTaskReportMessage.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/NCLifecycleTaskReportMessage.java @@ -58,6 +58,6 @@ public class NCLifecycleTaskReportMessage implements INCLifecycleMessage, ICcAdd @Override public MessageType getType() { - return MessageType.STARTUP_TASK_RESULT; + return MessageType.REGISTRATION_TASKS_RESULT; } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/asterixdb/blob/893d385f/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksRequestMessage.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksRequestMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksRequestMessage.java new file mode 100644 index 0000000..075c415 --- /dev/null +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksRequestMessage.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.app.replication.message; + +import java.util.logging.Level; +import java.util.logging.Logger; + +import org.apache.asterix.common.dataflow.ICcApplicationContext; +import org.apache.asterix.common.messaging.api.ICcAddressedMessage; +import org.apache.asterix.common.messaging.api.INCMessageBroker; +import org.apache.asterix.common.replication.INCLifecycleMessage; +import org.apache.asterix.common.transactions.IRecoveryManager.SystemState; +import org.apache.hyracks.api.client.NodeStatus; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.control.nc.NodeControllerService; + +public class RegistrationTasksRequestMessage implements INCLifecycleMessage, ICcAddressedMessage { + + private static final Logger LOGGER = Logger.getLogger(RegistrationTasksRequestMessage.class.getName()); + private static final long serialVersionUID = 1L; + private final SystemState state; + private final String nodeId; + private final NodeStatus nodeStatus; + + public RegistrationTasksRequestMessage(String nodeId, NodeStatus nodeStatus, SystemState state) { + this.state = state; + this.nodeId = nodeId; + this.nodeStatus = nodeStatus; + } + + public static void send(NodeControllerService cs, NodeStatus nodeStatus, SystemState systemState) + throws HyracksDataException { + try { + RegistrationTasksRequestMessage msg = new RegistrationTasksRequestMessage(cs.getId(), nodeStatus, + systemState); + ((INCMessageBroker) cs.getContext().getMessageBroker()).sendMessageToCC(msg); + } catch (Exception e) { + LOGGER.log(Level.SEVERE, "Unable to send RegistrationTasksRequestMessage to CC", e); + throw HyracksDataException.create(e); + } + } + + @Override + public void handle(ICcApplicationContext appCtx) throws HyracksDataException, InterruptedException { + appCtx.getFaultToleranceStrategy().process(this); + } + + public SystemState getState() { + return state; + } + + public String getNodeId() { + return nodeId; + } + + public NodeStatus getNodeStatus() { + return nodeStatus; + } + + @Override + public MessageType getType() { + return MessageType.REGISTRATION_TASKS_REQUEST; + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/asterixdb/blob/893d385f/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksResponseMessage.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksResponseMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksResponseMessage.java new file mode 100644 index 0000000..13525e3 --- /dev/null +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksResponseMessage.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.app.replication.message; + +import java.util.List; +import java.util.logging.Level; +import java.util.logging.Logger; + +import org.apache.asterix.common.api.INCLifecycleTask; +import org.apache.asterix.common.api.INcApplicationContext; +import org.apache.asterix.common.messaging.api.INCMessageBroker; +import org.apache.asterix.common.messaging.api.INcAddressedMessage; +import org.apache.asterix.common.replication.INCLifecycleMessage; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.api.service.IControllerService; +import org.apache.hyracks.control.nc.NCShutdownHook; +import org.apache.hyracks.util.ExitUtil; + +public class RegistrationTasksResponseMessage implements INCLifecycleMessage, INcAddressedMessage { + + private static final Logger LOGGER = Logger.getLogger(RegistrationTasksResponseMessage.class.getName()); + private static final long serialVersionUID = 1L; + private final String nodeId; + private final List<INCLifecycleTask> tasks; + + public RegistrationTasksResponseMessage(String nodeId, List<INCLifecycleTask> tasks) { + this.nodeId = nodeId; + this.tasks = tasks; + } + + @Override + public void handle(INcApplicationContext appCtx) throws HyracksDataException, InterruptedException { + INCMessageBroker broker = (INCMessageBroker) appCtx.getServiceContext().getMessageBroker(); + IControllerService cs = appCtx.getServiceContext().getControllerService(); + boolean success = true; + try { + Throwable exception = null; + try { + for (INCLifecycleTask task : tasks) { + if (LOGGER.isLoggable(Level.INFO)) { + LOGGER.log(Level.INFO, "Starting startup task: " + task); + } + task.perform(cs); + if (LOGGER.isLoggable(Level.INFO)) { + LOGGER.log(Level.INFO, "Completed startup task: " + task); + } + } + } catch (Throwable e) { //NOSONAR all startup failures should be reported to CC + LOGGER.log(Level.SEVERE, "Failed during startup task", e); + success = false; + exception = e; + } + NCLifecycleTaskReportMessage result = new NCLifecycleTaskReportMessage(nodeId, success); + result.setException(exception); + try { + broker.sendMessageToCC(result); + } catch (Exception e) { + success = false; + LOGGER.log(Level.SEVERE, "Failed sending message to cc", e); + } + } finally { + if (!success) { + // stop NC so that it can be started again + ExitUtil.exit(NCShutdownHook.FAILED_TO_STARTUP_EXIT_CODE); + } + } + } + + public String getNodeId() { + return nodeId; + } + + @Override + public MessageType getType() { + return MessageType.REGISTRATION_TASKS_RESPONSE; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/asterixdb/blob/893d385f/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/StartupTaskRequestMessage.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/StartupTaskRequestMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/StartupTaskRequestMessage.java deleted file mode 100644 index 21dee9c..0000000 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/StartupTaskRequestMessage.java +++ /dev/null @@ -1,71 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.asterix.app.replication.message; - -import java.util.logging.Level; -import java.util.logging.Logger; - -import org.apache.asterix.common.dataflow.ICcApplicationContext; -import org.apache.asterix.common.messaging.api.ICcAddressedMessage; -import org.apache.asterix.common.messaging.api.INCMessageBroker; -import org.apache.asterix.common.replication.INCLifecycleMessage; -import org.apache.asterix.common.transactions.IRecoveryManager.SystemState; -import org.apache.hyracks.api.exceptions.HyracksDataException; -import org.apache.hyracks.control.nc.NodeControllerService; - -public class StartupTaskRequestMessage implements INCLifecycleMessage, ICcAddressedMessage { - - private static final Logger LOGGER = Logger.getLogger(StartupTaskRequestMessage.class.getName()); - private static final long serialVersionUID = 1L; - private final SystemState state; - private final String nodeId; - - public StartupTaskRequestMessage(String nodeId, SystemState state) { - this.state = state; - this.nodeId = nodeId; - } - - public static void send(NodeControllerService cs, SystemState systemState) throws HyracksDataException { - try { - StartupTaskRequestMessage msg = new StartupTaskRequestMessage(cs.getId(), systemState); - ((INCMessageBroker) cs.getContext().getMessageBroker()).sendMessageToCC(msg); - } catch (Exception e) { - LOGGER.log(Level.SEVERE, "Unable to send StartupTaskRequestMessage to CC", e); - throw HyracksDataException.create(e); - } - } - - @Override - public void handle(ICcApplicationContext appCtx) throws HyracksDataException, InterruptedException { - appCtx.getFaultToleranceStrategy().process(this); - } - - public SystemState getState() { - return state; - } - - public String getNodeId() { - return nodeId; - } - - @Override - public MessageType getType() { - return MessageType.STARTUP_TASK_REQUEST; - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/asterixdb/blob/893d385f/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/StartupTaskResponseMessage.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/StartupTaskResponseMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/StartupTaskResponseMessage.java deleted file mode 100644 index b941343..0000000 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/StartupTaskResponseMessage.java +++ /dev/null @@ -1,93 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.asterix.app.replication.message; - -import java.util.List; -import java.util.logging.Level; -import java.util.logging.Logger; - -import org.apache.asterix.common.api.INCLifecycleTask; -import org.apache.asterix.common.api.INcApplicationContext; -import org.apache.asterix.common.messaging.api.INCMessageBroker; -import org.apache.asterix.common.messaging.api.INcAddressedMessage; -import org.apache.asterix.common.replication.INCLifecycleMessage; -import org.apache.hyracks.api.exceptions.HyracksDataException; -import org.apache.hyracks.api.service.IControllerService; -import org.apache.hyracks.control.nc.NCShutdownHook; -import org.apache.hyracks.util.ExitUtil; - -public class StartupTaskResponseMessage implements INCLifecycleMessage, INcAddressedMessage { - - private static final Logger LOGGER = Logger.getLogger(StartupTaskResponseMessage.class.getName()); - private static final long serialVersionUID = 1L; - private final String nodeId; - private final List<INCLifecycleTask> tasks; - - public StartupTaskResponseMessage(String nodeId, List<INCLifecycleTask> tasks) { - this.nodeId = nodeId; - this.tasks = tasks; - } - - @Override - public void handle(INcApplicationContext appCtx) throws HyracksDataException, InterruptedException { - INCMessageBroker broker = (INCMessageBroker) appCtx.getServiceContext().getMessageBroker(); - IControllerService cs = appCtx.getServiceContext().getControllerService(); - boolean success = true; - try { - Throwable exception = null; - try { - for (INCLifecycleTask task : tasks) { - if (LOGGER.isLoggable(Level.INFO)) { - LOGGER.log(Level.INFO, "Starting startup task: " + task); - } - task.perform(cs); - if (LOGGER.isLoggable(Level.INFO)) { - LOGGER.log(Level.INFO, "Completed startup task: " + task); - } - } - } catch (Throwable e) { //NOSONAR all startup failures should be reported to CC - LOGGER.log(Level.SEVERE, "Failed during startup task", e); - success = false; - exception = e; - } - NCLifecycleTaskReportMessage result = new NCLifecycleTaskReportMessage(nodeId, success); - result.setException(exception); - try { - broker.sendMessageToCC(result); - } catch (Exception e) { - success = false; - LOGGER.log(Level.SEVERE, "Failed sending message to cc", e); - } - } finally { - if (!success) { - // stop NC so that it can be started again - ExitUtil.exit(NCShutdownHook.FAILED_TO_STARTUP_EXIT_CODE); - } - } - } - - public String getNodeId() { - return nodeId; - } - - @Override - public MessageType getType() { - return MessageType.STARTUP_TASK_RESPONSE; - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/asterixdb/blob/893d385f/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java index 3d7f870..a18535d 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java @@ -26,7 +26,7 @@ import java.util.logging.Level; import java.util.logging.Logger; import org.apache.asterix.app.nc.NCAppRuntimeContext; -import org.apache.asterix.app.replication.message.StartupTaskRequestMessage; +import org.apache.asterix.app.replication.message.RegistrationTasksRequestMessage; import org.apache.asterix.common.api.AsterixThreadFactory; import org.apache.asterix.common.api.INcApplicationContext; import org.apache.asterix.common.config.AsterixExtension; @@ -48,6 +48,7 @@ import org.apache.asterix.messaging.NCMessageBroker; import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository; import org.apache.hyracks.api.application.INCServiceContext; import org.apache.hyracks.api.application.IServiceContext; +import org.apache.hyracks.api.client.NodeStatus; import org.apache.hyracks.api.config.IConfigManager; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.io.IFileDeviceResolver; @@ -67,7 +68,6 @@ public class NCApplication extends BaseNCApplication { private String nodeId; private boolean stopInitiated; private boolean startupCompleted; - private SystemState systemState; protected WebManager webManager; @Override @@ -117,9 +117,8 @@ public class NCApplication extends BaseNCApplication { this.ncServiceCtx.setMessagingChannelInterfaceFactory(interfaceFactory); IRecoveryManager recoveryMgr = runtimeContext.getTransactionSubsystem().getRecoveryManager(); - systemState = recoveryMgr.getSystemState(); - - if (systemState == SystemState.PERMANENT_DATA_LOSS) { + final SystemState stateOnStartup = recoveryMgr.getSystemState(); + if (stateOnStartup == SystemState.PERMANENT_DATA_LOSS) { if (LOGGER.isLoggable(Level.INFO)) { LOGGER.info("System state: " + SystemState.PERMANENT_DATA_LOSS); LOGGER.info("Node ID: " + nodeId); @@ -187,20 +186,27 @@ public class NCApplication extends BaseNCApplication { // Since we don't pass initial run flag in AsterixHyracksIntegrationUtil, we use the virtualNC flag final NodeProperties nodeProperties = runtimeContext.getNodeProperties(); - if (systemState == SystemState.PERMANENT_DATA_LOSS - && (nodeProperties.isInitialRun() || nodeProperties.isVirtualNc())) { - systemState = SystemState.BOOTSTRAPPING; + IRecoveryManager recoveryMgr = runtimeContext.getTransactionSubsystem().getRecoveryManager(); + SystemState state = recoveryMgr.getSystemState(); + if (state == SystemState.PERMANENT_DATA_LOSS && (nodeProperties.isInitialRun() || nodeProperties + .isVirtualNc())) { + state = SystemState.BOOTSTRAPPING; } - // Request startup tasks from CC - StartupTaskRequestMessage.send((NodeControllerService) ncServiceCtx.getControllerService(), systemState); + // Request registration tasks from CC + RegistrationTasksRequestMessage.send((NodeControllerService) ncServiceCtx.getControllerService(), + NodeStatus.BOOTING, state); startupCompleted = true; } @Override public void onRegisterNode() throws Exception { if (startupCompleted) { - // Request startup tasks from CC - StartupTaskRequestMessage.send((NodeControllerService) ncServiceCtx.getControllerService(), systemState); + /* + * If the node completed its startup before, then this is a re-registration with + * the CC and therefore the system state should be HEALTHY and the node status is ACTIVE + */ + RegistrationTasksRequestMessage.send((NodeControllerService) ncServiceCtx.getControllerService(), + NodeStatus.ACTIVE, SystemState.HEALTHY); } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/893d385f/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/INCLifecycleMessage.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/INCLifecycleMessage.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/INCLifecycleMessage.java index 87b0856..cb9fa8f 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/INCLifecycleMessage.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/INCLifecycleMessage.java @@ -22,16 +22,16 @@ import org.apache.hyracks.api.messages.IMessage; public interface INCLifecycleMessage extends IMessage { - public enum MessageType { + enum MessageType { REPLAY_LOGS_REQUEST, REPLAY_LOGS_RESPONSE, PREPARE_FAILBACK_REQUEST, PREPARE_FAILBACK_RESPONSE, COMPLETE_FAILBACK_REQUEST, COMPLETE_FAILBACK_RESPONSE, - STARTUP_TASK_REQUEST, - STARTUP_TASK_RESPONSE, - STARTUP_TASK_RESULT, + REGISTRATION_TASKS_REQUEST, + REGISTRATION_TASKS_RESPONSE, + REGISTRATION_TASKS_RESULT, TAKEOVER_PARTITION_REQUEST, TAKEOVER_PARTITION_RESPONSE, TAKEOVER_METADATA_NODE_REQUEST, http://git-wip-us.apache.org/repos/asf/asterixdb/blob/893d385f/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReportLocalCountersMessage.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReportLocalCountersMessage.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReportLocalCountersMessage.java new file mode 100644 index 0000000..3f8ced8 --- /dev/null +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReportLocalCountersMessage.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.runtime.message; + +import java.util.logging.Level; +import java.util.logging.Logger; + +import org.apache.asterix.common.api.INcApplicationContext; +import org.apache.asterix.common.dataflow.ICcApplicationContext; +import org.apache.asterix.common.messaging.api.ICcAddressedMessage; +import org.apache.asterix.common.messaging.api.INCMessageBroker; +import org.apache.asterix.common.metadata.MetadataIndexImmutableProperties; +import org.apache.asterix.common.transactions.IResourceIdManager; +import org.apache.asterix.transaction.management.service.transaction.TxnIdFactory; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.control.nc.NodeControllerService; + +public class ReportLocalCountersMessage implements ICcAddressedMessage { + private static final long serialVersionUID = 1L; + private static final Logger LOGGER = Logger.getLogger(ReportLocalCountersMessage.class.getName()); + private final long maxResourceId; + private final long maxTxnId; + private final String src; + + public ReportLocalCountersMessage(String src, long maxResourceId, long maxTxnId) { + this.src = src; + this.maxResourceId = maxResourceId; + this.maxTxnId = maxTxnId; + } + + @Override + public void handle(ICcApplicationContext appCtx) throws HyracksDataException, InterruptedException { + IResourceIdManager resourceIdManager = appCtx.getResourceIdManager(); + TxnIdFactory.ensureMinimumId(maxTxnId); + resourceIdManager.report(src, maxResourceId); + } + + public static void send(NodeControllerService cs) throws HyracksDataException { + NodeControllerService ncs = cs; + INcApplicationContext appContext = (INcApplicationContext) ncs.getApplicationContext(); + long maxResourceId = Math.max(appContext.getLocalResourceRepository().maxId(), + MetadataIndexImmutableProperties.FIRST_AVAILABLE_USER_DATASET_ID); + long maxTxnId = appContext.getTransactionSubsystem().getTransactionManager().getMaxTxnId(); + ReportLocalCountersMessage countersMessage = + new ReportLocalCountersMessage(ncs.getId(), maxResourceId, maxTxnId); + try { + ((INCMessageBroker) ncs.getContext().getMessageBroker()).sendMessageToCC(countersMessage); + } catch (Exception e) { + LOGGER.log(Level.SEVERE, "Unable to report local counters", e); + throw HyracksDataException.create(e); + } + } + + @Override + public String toString() { + return ReportLocalCountersMessage.class.getSimpleName(); + } +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/893d385f/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReportLocalCountersRequestMessage.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReportLocalCountersRequestMessage.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReportLocalCountersRequestMessage.java new file mode 100644 index 0000000..785ad2f --- /dev/null +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReportLocalCountersRequestMessage.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.runtime.message; + +import org.apache.asterix.common.api.INcApplicationContext; +import org.apache.asterix.common.messaging.api.INcAddressedMessage; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.control.nc.NodeControllerService; + +public class ReportLocalCountersRequestMessage implements INcAddressedMessage { + private static final long serialVersionUID = 1L; + + @Override + public void handle(INcApplicationContext appCtx) throws HyracksDataException, InterruptedException { + ReportLocalCountersMessage.send((NodeControllerService) appCtx.getServiceContext().getControllerService()); + } + + @Override + public String toString() { + return ReportLocalCountersRequestMessage.class.getSimpleName(); + } +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/893d385f/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReportMaxResourceIdMessage.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReportMaxResourceIdMessage.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReportMaxResourceIdMessage.java deleted file mode 100644 index 277c0ba..0000000 --- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReportMaxResourceIdMessage.java +++ /dev/null @@ -1,72 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.asterix.runtime.message; - -import java.util.logging.Level; -import java.util.logging.Logger; - -import org.apache.asterix.common.api.INcApplicationContext; -import org.apache.asterix.common.dataflow.ICcApplicationContext; -import org.apache.asterix.common.messaging.api.ICcAddressedMessage; -import org.apache.asterix.common.messaging.api.INCMessageBroker; -import org.apache.asterix.common.metadata.MetadataIndexImmutableProperties; -import org.apache.asterix.common.transactions.IResourceIdManager; -import org.apache.hyracks.api.exceptions.HyracksDataException; -import org.apache.hyracks.control.nc.NodeControllerService; - -public class ReportMaxResourceIdMessage implements ICcAddressedMessage { - private static final long serialVersionUID = 1L; - private static final Logger LOGGER = Logger.getLogger(ReportMaxResourceIdMessage.class.getName()); - private final long maxResourceId; - private final String src; - - public ReportMaxResourceIdMessage(String src, long maxResourceId) { - this.src = src; - this.maxResourceId = maxResourceId; - } - - public long getMaxResourceId() { - return maxResourceId; - } - - @Override - public void handle(ICcApplicationContext appCtx) throws HyracksDataException, InterruptedException { - IResourceIdManager resourceIdManager = appCtx.getResourceIdManager(); - resourceIdManager.report(src, maxResourceId); - } - - public static void send(NodeControllerService cs) throws HyracksDataException { - NodeControllerService ncs = cs; - INcApplicationContext appContext = (INcApplicationContext) ncs.getApplicationContext(); - long maxResourceId = Math.max(appContext.getLocalResourceRepository().maxId(), - MetadataIndexImmutableProperties.FIRST_AVAILABLE_USER_DATASET_ID); - ReportMaxResourceIdMessage maxResourceIdMsg = new ReportMaxResourceIdMessage(ncs.getId(), maxResourceId); - try { - ((INCMessageBroker) ncs.getContext().getMessageBroker()).sendMessageToCC(maxResourceIdMsg); - } catch (Exception e) { - LOGGER.log(Level.SEVERE, "Unable to report max local resource id", e); - throw HyracksDataException.create(e); - } - } - - @Override - public String toString() { - return ReportMaxResourceIdMessage.class.getSimpleName(); - } -} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/893d385f/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReportMaxResourceIdRequestMessage.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReportMaxResourceIdRequestMessage.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReportMaxResourceIdRequestMessage.java deleted file mode 100644 index a43376d..0000000 --- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReportMaxResourceIdRequestMessage.java +++ /dev/null @@ -1,38 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.asterix.runtime.message; - -import org.apache.asterix.common.api.INcApplicationContext; -import org.apache.asterix.common.messaging.api.INcAddressedMessage; -import org.apache.hyracks.api.exceptions.HyracksDataException; -import org.apache.hyracks.control.nc.NodeControllerService; - -public class ReportMaxResourceIdRequestMessage implements INcAddressedMessage { - private static final long serialVersionUID = 1L; - - @Override - public void handle(INcApplicationContext appCtx) throws HyracksDataException, InterruptedException { - ReportMaxResourceIdMessage.send((NodeControllerService) appCtx.getServiceContext().getControllerService()); - } - - @Override - public String toString() { - return ReportMaxResourceIdRequestMessage.class.getSimpleName(); - } -} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/893d385f/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestMessage.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestMessage.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestMessage.java index decc1a9..a2f4aa1 100644 --- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestMessage.java +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestMessage.java @@ -61,7 +61,7 @@ public class ResourceIdRequestMessage implements ICcAddressedMessage { private void requestMaxResourceID(IClusterStateManager clusterStateManager, IResourceIdManager resourceIdManager, ICCMessageBroker broker) throws Exception { Set<String> getParticipantNodes = clusterStateManager.getParticipantNodes(); - ReportMaxResourceIdRequestMessage msg = new ReportMaxResourceIdRequestMessage(); + ReportLocalCountersRequestMessage msg = new ReportLocalCountersRequestMessage(); for (String nodeId : getParticipantNodes) { if (!resourceIdManager.reported(nodeId)) { broker.sendApplicationMessageToNC(msg, nodeId); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/893d385f/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TxnIdFactory.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TxnIdFactory.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TxnIdFactory.java index 71d7f56..eb59e74 100644 --- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TxnIdFactory.java +++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TxnIdFactory.java @@ -37,6 +37,6 @@ public class TxnIdFactory { } public static void ensureMinimumId(long id) { - TxnIdFactory.id.set(id); + TxnIdFactory.id.updateAndGet(current -> Math.max(current, id)); } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/asterixdb/blob/893d385f/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/NodeStatus.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/NodeStatus.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/NodeStatus.java index b84f1f2..10a9a3c 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/NodeStatus.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/NodeStatus.java @@ -19,6 +19,7 @@ package org.apache.hyracks.api.client; public enum NodeStatus { - ALIVE, + ACTIVE, + BOOTING, DEAD } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/893d385f/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/NodeManager.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/NodeManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/NodeManager.java index 3cd6235..4928564 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/NodeManager.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/NodeManager.java @@ -136,7 +136,7 @@ public class NodeManager implements INodeManager { public Map<String, NodeControllerInfo> getNodeControllerInfoMap() { Map<String, NodeControllerInfo> result = new LinkedHashMap<>(); nodeRegistry.forEach( - (key, ncState) -> result.put(key, new NodeControllerInfo(key, NodeStatus.ALIVE, ncState.getDataPort(), + (key, ncState) -> result.put(key, new NodeControllerInfo(key, NodeStatus.ACTIVE, ncState.getDataPort(), ncState.getDatasetPort(), ncState.getMessagingPort(), ncState.getCapacity().getCores()))); return result; } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/893d385f/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/org/apache/hyracks/hdfs/scheduler/SchedulerTest.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/org/apache/hyracks/hdfs/scheduler/SchedulerTest.java b/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/org/apache/hyracks/hdfs/scheduler/SchedulerTest.java index 445a15c..bb28c79 100644 --- a/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/org/apache/hyracks/hdfs/scheduler/SchedulerTest.java +++ b/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/org/apache/hyracks/hdfs/scheduler/SchedulerTest.java @@ -97,10 +97,10 @@ public class SchedulerTest extends TestCase { Map<String, NodeControllerInfo> ncNameToNcInfos = TestUtils.generateNodeControllerInfo(4, "nc", "10.0.0.", dataPort, resultPort, messagingPort); ncNameToNcInfos.put("nc7", - new NodeControllerInfo("nc7", NodeStatus.ALIVE, new NetworkAddress("10.0.0.7", dataPort), + new NodeControllerInfo("nc7", NodeStatus.ACTIVE, new NetworkAddress("10.0.0.7", dataPort), new NetworkAddress("10.0.0.5", resultPort), new NetworkAddress("10.0.0.5", messagingPort), 2)); ncNameToNcInfos.put("nc12", - new NodeControllerInfo("nc12", NodeStatus.ALIVE, new NetworkAddress("10.0.0.12", dataPort), + new NodeControllerInfo("nc12", NodeStatus.ACTIVE, new NetworkAddress("10.0.0.12", dataPort), new NetworkAddress("10.0.0.5", resultPort), new NetworkAddress("10.0.0.5", messagingPort), 2)); InputSplit[] fileSplits = new InputSplit[12]; http://git-wip-us.apache.org/repos/asf/asterixdb/blob/893d385f/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestUtils.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestUtils.java b/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestUtils.java index c3d86e8..1814e85 100644 --- a/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestUtils.java +++ b/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestUtils.java @@ -135,7 +135,7 @@ public class TestUtils { String ncId = ncNamePrefix + i; String ncAddress = addressPrefix + i; ncNameToNcInfos.put(ncId, - new NodeControllerInfo(ncId, NodeStatus.ALIVE, new NetworkAddress(ncAddress, netPort), + new NodeControllerInfo(ncId, NodeStatus.ACTIVE, new NetworkAddress(ncAddress, netPort), new NetworkAddress(ncAddress, dataPort), new NetworkAddress(ncAddress, messagingPort), 2)); } return ncNameToNcInfos;
