allenma commented on a change in pull request #443: KYLIN-3759 & KYLIN-3744
URL: https://github.com/apache/kylin/pull/443#discussion_r250115925
##########
File path:
stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/Coordinator.java
##########
@@ -478,24 +544,61 @@ public void doReassign(CubeInstance cubeInstance,
CubeAssignment preAssignments,
stopRequest.setRemoveData(true);
}
stopConsumersInReplicaSet(rs, stopRequest);
+ rollbackStarted.remove(rs.getReplicaSetID());
} catch (IOException e1) {
logger.error("fail to stop consumers for cube:" + cubeName
+ " replicaSet:" + rs.getReplicaSetID(),
e1);
}
}
// roll back success assignment
+ Set<Integer> rollbackAssigned =
successAssigned.stream().map(ReplicaSet::getReplicaSetID)
+ .collect(Collectors.toSet());
for (ReplicaSet rs : successAssigned) {
try {
List<Partition> partitions =
preAssignments.getPartitionsByReplicaSetID(rs.getReplicaSetID());
assignCubeToReplicaSet(rs, cubeName, partitions, true,
true);
+ rollbackAssigned.remove(rs.getReplicaSetID());
} catch (IOException e1) {
- logger.error(
- "fail to start consumers for cube:" + cubeName + "
replicaSet:" + rs.getReplicaSetID(), e1);
+ logger.error("fail to start consumers for cube:" +
cubeName + " replicaSet:" + rs.getReplicaSetID(),
+ e1);
}
}
- throw new RuntimeException(e);
+ Set<Node> failedReceiver = new
HashSet<>(failedConvertToImmutableNodes);
+ for (Node node : failedConvertToImmutableNodes) {
+ try {
+ makeCubeImmutableForReceiver(node, cubeName);
+ failedReceiver.remove(node);
+ } catch (IOException ioe) {
+ logger.error("fail to make cube immutable for cube:" +
cubeName + " to " + node, ioe);
+ }
+ }
+
+ StringBuilder str = new StringBuilder();
+ try {
+ str.append("
Started:").append(JsonUtil.writeValueAsString(rollbackStarted));
+ str.append("
Assigned:").append(JsonUtil.writeValueAsString(rollbackAssigned));
+ str.append("
RemotedPresisted:").append(JsonUtil.writeValueAsString(failedReceiver));
+ } catch (JsonProcessingException jpe) {
+ logger.error("", jpe);
+ }
+
+ String failedInfo = str.toString();
+
+ if (!rollbackStarted.isEmpty()) {
+ throw new
ClusterStateInconsistentException(ClusterState.ROLLBACK_FAILED,
TransactionStep.START_NEW,
+ failedInfo, e);
+ } else if (!rollbackAssigned.isEmpty()) {
+ throw new
ClusterStateInconsistentException(ClusterState.ROLLBACK_FAILED,
TransactionStep.ASSIGN_NEW,
+ failedInfo, e);
+ } else if (!failedReceiver.isEmpty()) {
+ throw new
ClusterStateInconsistentException(ClusterState.ROLLBACK_FAILED,
+ TransactionStep.MAKE_IMMUTABLE, failedInfo, e);
+ } else {
+ throw new
ClusterStateInconsistentException(ClusterState.ROLLBACK_SUCCESS,
TransactionStep.ASSIGN_NEW,
Review comment:
Rollback success, the cluster state should be consistent. And the state
should be cube level
And should we put cube's inconsistent state somewhere, and return to client
so that we can show the state on GUI.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services