hit-lacus commented on a change in pull request #443: KYLIN-3759 & KYLIN-3744
URL: https://github.com/apache/kylin/pull/443#discussion_r250118484
 
 

 ##########
 File path: 
stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/Coordinator.java
 ##########
 @@ -407,26 +448,43 @@ public void doReassign(CubeInstance cubeInstance, 
CubeAssignment preAssignments,
                 allPositions.add(position);
                 successSyncReplicaSet.add(rs);
             }
-            consumePosition = 
streamingSource.getSourcePositionHandler().mergePositions(allPositions, 
MergeStrategy.KEEP_LARGE);
+            consumePosition = 
streamingSource.getSourcePositionHandler().mergePositions(allPositions,
+                    MergeStrategy.KEEP_LARGE);
             logger.info("the consumer position for cube:{} is:{}", cubeName, 
consumePosition);
         } catch (Exception e) {
             logger.error("fail to sync assign replicaSet for cube:" + 
cubeName, e);
             // roll back the success group
+            Set<Integer> needRollback = 
successSyncReplicaSet.stream().map(ReplicaSet::getReplicaSetID)
+                    .collect(Collectors.toSet());
             for (ReplicaSet rs : successSyncReplicaSet) {
                 StartConsumersRequest request = new StartConsumersRequest();
                 request.setCube(cubeName);
                 try {
                     startConsumersInReplicaSet(rs, request);
+                    needRollback.remove(rs.getReplicaSetID());
                 } catch (IOException e1) {
-                    logger.error(
-                            "fail to start consumers for cube:" + cubeName + " 
replicaSet:" + rs.getReplicaSetID(), e1);
+                    logger.error("fail to start consumers for cube:" + 
cubeName + " replicaSet:" + rs.getReplicaSetID(),
+                            e1);
                 }
             }
-            throw new RuntimeException(e);
+            if (needRollback.isEmpty()) {
+                throw new 
ClusterStateInconsistentException(ClusterState.ROLLBACK_SUCCESS,
 
 Review comment:
   Maybe my code is not clear enough, I use `ClusterState.ROLLBACK_SUCCESS` to 
indicate that the cluster state is consistent but reassign is failed. I will 
modify this place later.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to