This is an automated email from the ASF dual-hosted git repository. tanxinyu pushed a commit to branch consensus_module_refactor in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 54befaace307079cc3e97b5f7e43e36a2814b000 Author: BUAAserein <[email protected]> AuthorDate: Thu Aug 17 15:54:58 2023 +0800 refactor createLocalPeer --- .../iotdb/confignode/manager/ConfigManager.java | 6 +- .../manager/consensus/ConsensusManager.java | 23 ++- .../apache/iotdb/consensus/iot/StabilityTest.java | 14 +- .../iotdb/consensus/ratis/RatisConsensusTest.java | 8 +- .../iotdb/consensus/ratis/RecoverReadTest.java | 16 +- .../iotdb/consensus/simple/RecoveryTest.java | 19 ++- .../consensus/simple/SimpleConsensusTest.java | 176 ++++++++++++--------- .../impl/DataNodeInternalRPCServiceImpl.java | 21 +-- .../thrift/impl/DataNodeRegionManager.java | 26 ++- 9 files changed, 173 insertions(+), 136 deletions(-) diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java index bbfaff13330..0b586bff9a0 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java @@ -172,6 +172,7 @@ import org.apache.iotdb.confignode.rpc.thrift.TUnsetSchemaTemplateReq; import org.apache.iotdb.confignode.rpc.thrift.TUpdateModelInfoReq; import org.apache.iotdb.confignode.rpc.thrift.TUpdateModelStateReq; import org.apache.iotdb.consensus.common.DataSet; +import org.apache.iotdb.consensus.exception.ConsensusException; import org.apache.iotdb.db.schemaengine.template.Template; import org.apache.iotdb.db.schemaengine.template.TemplateAlterOperationType; import org.apache.iotdb.db.schemaengine.template.alter.TemplateAlterOperationUtil; @@ -1135,8 +1136,9 @@ public class ConfigManager implements IManager { } catch (InterruptedException e) { Thread.currentThread().interrupt(); LOGGER.warn("Unexpected interruption during retry creating peer for consensus group"); - } catch (Exception e) { - LOGGER.error("Failed to create peer for consensus group", e); + } catch (ConsensusException e) { + LOGGER.error( + "Something wrong happened while calling consensus layer's createLocalPeer API.", e); break; } } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/consensus/ConsensusManager.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/consensus/ConsensusManager.java index 5d0d0c6551f..62345c5a136 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/consensus/ConsensusManager.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/consensus/ConsensusManager.java @@ -202,18 +202,26 @@ public class ConsensusManager { createPeerForConsensusGroup(SystemPropertiesUtils.loadConfigNodeList()); } catch (BadNodeUrlException e) { throw new IOException(e); + } catch (ConsensusException e) { + LOGGER.error( + "Something wrong happened while calling consensus layer's createLocalPeer API.", e); } } LOGGER.info("Init ConsensusManager successfully when restarted"); } else if (ConfigNodeDescriptor.getInstance().isSeedConfigNode()) { // Create ConsensusGroup that contains only itself // if the current ConfigNode is Seed-ConfigNode - createPeerForConsensusGroup( - Collections.singletonList( - new TConfigNodeLocation( - SEED_CONFIG_NODE_ID, - new TEndPoint(CONF.getInternalAddress(), CONF.getInternalPort()), - new TEndPoint(CONF.getInternalAddress(), CONF.getConsensusPort())))); + try { + createPeerForConsensusGroup( + Collections.singletonList( + new TConfigNodeLocation( + SEED_CONFIG_NODE_ID, + new TEndPoint(CONF.getInternalAddress(), CONF.getInternalPort()), + new TEndPoint(CONF.getInternalAddress(), CONF.getConsensusPort())))); + } catch (ConsensusException e) { + LOGGER.error( + "Something wrong happened while calling consensus layer's createLocalPeer API.", e); + } } } @@ -239,7 +247,8 @@ public class ConsensusManager { * * @param configNodeLocations All registered ConfigNodes */ - public void createPeerForConsensusGroup(List<TConfigNodeLocation> configNodeLocations) { + public void createPeerForConsensusGroup(List<TConfigNodeLocation> configNodeLocations) + throws ConsensusException { LOGGER.info("createPeerForConsensusGroup {}...", configNodeLocations); List<Peer> peerList = new ArrayList<>(); diff --git a/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/iot/StabilityTest.java b/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/iot/StabilityTest.java index c4333c860ea..ec6a7018942 100644 --- a/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/iot/StabilityTest.java +++ b/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/iot/StabilityTest.java @@ -26,8 +26,8 @@ import org.apache.iotdb.commons.consensus.DataRegionId; import org.apache.iotdb.consensus.ConsensusFactory; import org.apache.iotdb.consensus.IConsensus; import org.apache.iotdb.consensus.common.Peer; -import org.apache.iotdb.consensus.common.response.ConsensusGenericResponse; import org.apache.iotdb.consensus.config.ConsensusConfig; +import org.apache.iotdb.consensus.exception.ConsensusException; import org.apache.iotdb.consensus.exception.ConsensusGroupModifyPeerException; import org.apache.iotdb.consensus.iot.util.TestStateMachine; @@ -100,17 +100,13 @@ public class StabilityTest { consensusImpl = null; constructConsensus(); - - ConsensusGenericResponse response = - consensusImpl.createLocalPeer( - dataRegionId, - Collections.singletonList( - new Peer(dataRegionId, 1, new TEndPoint("0.0.0.0", basePort)))); - Assert.assertTrue(response.isSuccess()); + consensusImpl.createLocalPeer( + dataRegionId, + Collections.singletonList(new Peer(dataRegionId, 1, new TEndPoint("0.0.0.0", basePort)))); consensusImpl.deleteLocalPeer(dataRegionId); } - public void snapshotTest() throws IOException { + public void snapshotTest() throws IOException, ConsensusException { consensusImpl.createLocalPeer( dataRegionId, Collections.singletonList(new Peer(dataRegionId, 1, new TEndPoint("0.0.0.0", basePort)))); diff --git a/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/ratis/RatisConsensusTest.java b/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/ratis/RatisConsensusTest.java index 8da2d467248..10dcdfcc0c8 100644 --- a/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/ratis/RatisConsensusTest.java +++ b/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/ratis/RatisConsensusTest.java @@ -116,9 +116,11 @@ public class RatisConsensusTest { servers.get(0).createLocalPeer(group.getGroupId(), original); doConsensus(servers.get(0), group.getGroupId(), 10, 10); - ConsensusGenericResponse resp = servers.get(0).createLocalPeer(group.getGroupId(), original); - Assert.assertFalse(resp.isSuccess()); - Assert.assertTrue(resp.getException() instanceof RatisRequestFailedException); + try { + servers.get(0).createLocalPeer(group.getGroupId(), original); + } catch (ConsensusException e) { + Assert.assertTrue(e instanceof RatisRequestFailedException); + } // add 2 members servers.get(1).createLocalPeer(group.getGroupId(), Collections.emptyList()); diff --git a/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/ratis/RecoverReadTest.java b/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/ratis/RecoverReadTest.java index a4f7c2a1ecb..f987c70d4a1 100644 --- a/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/ratis/RecoverReadTest.java +++ b/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/ratis/RecoverReadTest.java @@ -124,7 +124,9 @@ public class RecoverReadTest { final ConsensusGroupId gid = miniCluster.getGid(); final List<Peer> members = miniCluster.getPeers(); - miniCluster.getServers().forEach(s -> s.createLocalPeer(gid, members)); + for (RatisConsensus s : miniCluster.getServers()) { + s.createLocalPeer(gid, members); + } // first write 10 ops TestUtils.write(miniCluster.getServer(0), gid, 10); @@ -151,7 +153,9 @@ public class RecoverReadTest { final ConsensusGroupId gid = miniCluster.getGid(); final List<Peer> members = miniCluster.getPeers(); - miniCluster.getServers().forEach(s -> s.createLocalPeer(gid, members)); + for (RatisConsensus s : miniCluster.getServers()) { + s.createLocalPeer(gid, members); + } // first write 10 ops TestUtils.write(miniCluster.getServer(0), gid, 10); @@ -193,7 +197,9 @@ public class RecoverReadTest { final ConsensusGroupId gid = miniCluster.getGid(); final List<Peer> members = miniCluster.getPeers(); - miniCluster.getServers().forEach(s -> s.createLocalPeer(gid, members)); + for (RatisConsensus s : miniCluster.getServers()) { + s.createLocalPeer(gid, members); + } // first write 10 ops TestUtils.write(miniCluster.getServer(0), gid, 10); @@ -222,7 +228,9 @@ public class RecoverReadTest { final ConsensusGroupId gid = miniCluster.getGid(); final List<Peer> members = miniCluster.getPeers(); - miniCluster.getServers().forEach(s -> s.createLocalPeer(gid, members)); + for (RatisConsensus s : miniCluster.getServers()) { + s.createLocalPeer(gid, members); + } // first write 30 ops TestUtils.write(miniCluster.getServer(0), gid, 50); diff --git a/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/simple/RecoveryTest.java b/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/simple/RecoveryTest.java index 5fe3928925e..e2f9167b6e2 100644 --- a/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/simple/RecoveryTest.java +++ b/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/simple/RecoveryTest.java @@ -27,8 +27,8 @@ import org.apache.iotdb.consensus.ConsensusFactory; import org.apache.iotdb.consensus.EmptyStateMachine; import org.apache.iotdb.consensus.IConsensus; import org.apache.iotdb.consensus.common.Peer; -import org.apache.iotdb.consensus.common.response.ConsensusGenericResponse; import org.apache.iotdb.consensus.config.ConsensusConfig; +import org.apache.iotdb.consensus.exception.ConsensusException; import org.apache.iotdb.consensus.exception.ConsensusGroupAlreadyExistException; import org.apache.ratis.util.FileUtils; @@ -86,14 +86,13 @@ public class RecoveryTest { consensusImpl = null; constructConsensus(); - - ConsensusGenericResponse response = - consensusImpl.createLocalPeer( - schemaRegionId, - Collections.singletonList(new Peer(schemaRegionId, 1, new TEndPoint("0.0.0.0", 9000)))); - - Assert.assertEquals( - response.getException().getMessage(), - new ConsensusGroupAlreadyExistException(schemaRegionId).getMessage()); + try { + consensusImpl.createLocalPeer( + schemaRegionId, + Collections.singletonList(new Peer(schemaRegionId, 1, new TEndPoint("0.0.0.0", 9000)))); + } catch (ConsensusException e) { + Assert.assertEquals( + e.getMessage(), new ConsensusGroupAlreadyExistException(schemaRegionId).getMessage()); + } } } diff --git a/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/simple/SimpleConsensusTest.java b/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/simple/SimpleConsensusTest.java index e70c1a6b24e..f4d6eb58296 100644 --- a/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/simple/SimpleConsensusTest.java +++ b/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/simple/SimpleConsensusTest.java @@ -35,7 +35,6 @@ import org.apache.iotdb.consensus.common.Peer; import org.apache.iotdb.consensus.common.request.ByteBufferConsensusRequest; import org.apache.iotdb.consensus.common.request.IConsensusRequest; import org.apache.iotdb.consensus.common.response.ConsensusGenericResponse; -import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse; import org.apache.iotdb.consensus.config.ConsensusConfig; import org.apache.iotdb.consensus.exception.*; @@ -163,42 +162,47 @@ public class SimpleConsensusTest { @Test public void addConsensusGroup() { - ConsensusGenericResponse response1 = - consensusImpl.createLocalPeer( - dataRegionId, - Collections.singletonList(new Peer(dataRegionId, 1, new TEndPoint("0.0.0.0", 6667)))); - assertTrue(response1.isSuccess()); - assertNull(response1.getException()); - - ConsensusGenericResponse response2 = - consensusImpl.createLocalPeer( - dataRegionId, - Collections.singletonList(new Peer(dataRegionId, 1, new TEndPoint("0.0.0.0", 6667)))); - assertFalse(response2.isSuccess()); - assertTrue(response2.getException() instanceof ConsensusGroupAlreadyExistException); - - ConsensusGenericResponse response3 = - consensusImpl.createLocalPeer( - dataRegionId, - Arrays.asList( - new Peer(dataRegionId, 1, new TEndPoint("0.0.0.0", 6667)), - new Peer(dataRegionId, 1, new TEndPoint("0.0.0.1", 6667)))); - assertFalse(response3.isSuccess()); - assertTrue(response3.getException() instanceof IllegalPeerNumException); - - ConsensusGenericResponse response4 = - consensusImpl.createLocalPeer( - dataRegionId, - Collections.singletonList(new Peer(dataRegionId, 1, new TEndPoint("0.0.0.1", 6667)))); - assertFalse(response4.isSuccess()); - assertTrue(response4.getException() instanceof IllegalPeerEndpointException); - - ConsensusGenericResponse response5 = - consensusImpl.createLocalPeer( - schemaRegionId, - Collections.singletonList(new Peer(schemaRegionId, 1, new TEndPoint("0.0.0.0", 6667)))); - assertTrue(response5.isSuccess()); - assertNull(response5.getException()); + try { + consensusImpl.createLocalPeer( + dataRegionId, + Collections.singletonList(new Peer(dataRegionId, 1, new TEndPoint("0.0.0.0", 6667)))); + } catch (ConsensusException e) { + throw new RuntimeException(e); + } + + try { + consensusImpl.createLocalPeer( + dataRegionId, + Collections.singletonList(new Peer(dataRegionId, 1, new TEndPoint("0.0.0.0", 6667)))); + } catch (ConsensusException e) { + assertTrue(e instanceof ConsensusGroupAlreadyExistException); + } + + try { + consensusImpl.createLocalPeer( + dataRegionId, + Arrays.asList( + new Peer(dataRegionId, 1, new TEndPoint("0.0.0.0", 6667)), + new Peer(dataRegionId, 1, new TEndPoint("0.0.0.1", 6667)))); + } catch (ConsensusException e) { + assertTrue(e instanceof IllegalPeerNumException); + } + + try { + consensusImpl.createLocalPeer( + dataRegionId, + Collections.singletonList(new Peer(dataRegionId, 1, new TEndPoint("0.0.0.1", 6667)))); + } catch (ConsensusException e) { + assertTrue(e instanceof IllegalPeerEndpointException); + } + + try { + consensusImpl.createLocalPeer( + schemaRegionId, + Collections.singletonList(new Peer(schemaRegionId, 1, new TEndPoint("0.0.0.0", 6667)))); + } catch (ConsensusException e) { + throw new RuntimeException(e); + } } @Test @@ -207,12 +211,13 @@ public class SimpleConsensusTest { assertFalse(response1.isSuccess()); assertTrue(response1.getException() instanceof ConsensusGroupNotExistException); - ConsensusGenericResponse response2 = - consensusImpl.createLocalPeer( - dataRegionId, - Collections.singletonList(new Peer(dataRegionId, 1, new TEndPoint("0.0.0.0", 6667)))); - assertTrue(response2.isSuccess()); - assertNull(response2.getException()); + try { + consensusImpl.createLocalPeer( + dataRegionId, + Collections.singletonList(new Peer(dataRegionId, 1, new TEndPoint("0.0.0.0", 6667)))); + } catch (ConsensusException e) { + throw new RuntimeException(e); + } ConsensusGenericResponse response3 = consensusImpl.deleteLocalPeer(dataRegionId); assertTrue(response3.isSuccess()); @@ -251,47 +256,64 @@ public class SimpleConsensusTest { @Test public void write() { - ConsensusGenericResponse response1 = - consensusImpl.createLocalPeer( - dataRegionId, - Collections.singletonList(new Peer(dataRegionId, 1, new TEndPoint("0.0.0.0", 6667)))); - assertTrue(response1.isSuccess()); - assertNull(response1.getException()); - - ConsensusGenericResponse response2 = - consensusImpl.createLocalPeer( - schemaRegionId, - Collections.singletonList(new Peer(schemaRegionId, 1, new TEndPoint("0.0.0.0", 6667)))); - assertTrue(response2.isSuccess()); - assertNull(response2.getException()); - - ConsensusGenericResponse response3 = - consensusImpl.createLocalPeer( - configId, - Collections.singletonList(new Peer(configId, 1, new TEndPoint("0.0.0.0", 6667)))); - assertTrue(response3.isSuccess()); - assertNull(response3.getException()); + try { + consensusImpl.createLocalPeer( + dataRegionId, + Collections.singletonList(new Peer(dataRegionId, 1, new TEndPoint("0.0.0.0", 6667)))); + } catch (ConsensusException e) { + throw new RuntimeException(e); + } + + try { + consensusImpl.createLocalPeer( + schemaRegionId, + Collections.singletonList(new Peer(schemaRegionId, 1, new TEndPoint("0.0.0.0", 6667)))); + } catch (ConsensusException e) { + throw new RuntimeException(e); + } + + try { + consensusImpl.createLocalPeer( + configId, + Collections.singletonList(new Peer(configId, 1, new TEndPoint("0.0.0.0", 6667)))); + } catch (ConsensusException e) { + throw new RuntimeException(e); + } // test new TestStateMachine(true), should return 1; - ConsensusWriteResponse response4 = consensusImpl.write(dataRegionId, entry1); - assertNull(response4.getException()); - assertNotNull(response4.getStatus()); - assertEquals(-1, response4.getStatus().getCode()); + try { + TSStatus response4 = consensusImpl.write(dataRegionId, entry1); + assertNotNull(response4); + assertEquals(-1, response4.getCode()); + } catch (ConsensusException e) { + throw new RuntimeException(e); + } // test new TestStateMachine(false), should return -1; - ConsensusWriteResponse response5 = consensusImpl.write(schemaRegionId, entry1); - assertNull(response5.getException()); - assertNotNull(response5.getStatus()); - assertEquals(1, response5.getStatus().getCode()); + try { + TSStatus response5 = consensusImpl.write(schemaRegionId, entry1); + assertNotNull(response5); + assertEquals(1, response5.getCode()); + } catch (ConsensusException e) { + throw new RuntimeException(e); + } // test new EmptyStateMachine(), should return 0; - ConsensusWriteResponse response6 = consensusImpl.write(configId, entry1); - assertNull(response6.getException()); - assertEquals(0, response6.getStatus().getCode()); + try { + TSStatus response6 = consensusImpl.write(configId, entry1); + assertNull(response6); + assertEquals(0, response6.getCode()); + } catch (ConsensusException e) { + throw new RuntimeException(e); + } // test ByteBufferConsensusRequest, should return 0; - ConsensusWriteResponse response7 = consensusImpl.write(dataRegionId, entry2); - assertNull(response7.getException()); - assertEquals(0, response7.getStatus().getCode()); + try { + TSStatus response7 = consensusImpl.write(dataRegionId, entry2); + assertNull(response7); + assertEquals(0, response7.getCode()); + } catch (ConsensusException e) { + throw new RuntimeException(e); + } } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java index 6c150b8fc70..4866d77bcfc 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java @@ -51,6 +51,7 @@ import org.apache.iotdb.commons.udf.UDFInformation; import org.apache.iotdb.commons.udf.service.UDFManagementService; import org.apache.iotdb.consensus.common.Peer; import org.apache.iotdb.consensus.common.response.ConsensusGenericResponse; +import org.apache.iotdb.consensus.exception.ConsensusException; import org.apache.iotdb.consensus.exception.PeerNotInConsensusGroupException; import org.apache.iotdb.db.auth.AuthorizerManager; import org.apache.iotdb.db.conf.IoTDBDescriptor; @@ -1750,21 +1751,23 @@ public class DataNodeInternalRPCServiceImpl implements IDataNodeRPCService.Iface peers, regionId); TSStatus status = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()); - ConsensusGenericResponse resp; - if (regionId instanceof DataRegionId) { - resp = DataRegionConsensusImpl.getInstance().createLocalPeer(regionId, peers); - } else { - resp = SchemaRegionConsensusImpl.getInstance().createLocalPeer(regionId, peers); - } - if (!resp.isSuccess()) { + try { + if (regionId instanceof DataRegionId) { + DataRegionConsensusImpl.getInstance().createLocalPeer(regionId, peers); + } else { + SchemaRegionConsensusImpl.getInstance().createLocalPeer(regionId, peers); + } + } catch (ConsensusException e) { + LOGGER.error( + "Something wrong happened while calling consensus layer's createLocalPeer API.", e); LOGGER.warn( "{}, CreateNewRegionPeer error, peers: {}, regionId: {}, errorMessage", REGION_MIGRATE_PROCESS, peers, regionId, - resp.getException()); + e); status.setCode(TSStatusCode.MIGRATE_REGION_ERROR.getStatusCode()); - status.setMessage(resp.getException().getMessage()); + status.setMessage(e.getMessage()); return status; } LOGGER.info( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeRegionManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeRegionManager.java index fdff724a3dc..52d7cc37cd8 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeRegionManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeRegionManager.java @@ -31,7 +31,7 @@ import org.apache.iotdb.commons.exception.IllegalPathException; import org.apache.iotdb.commons.exception.MetadataException; import org.apache.iotdb.commons.path.PartialPath; import org.apache.iotdb.consensus.common.Peer; -import org.apache.iotdb.consensus.common.response.ConsensusGenericResponse; +import org.apache.iotdb.consensus.exception.ConsensusException; import org.apache.iotdb.consensus.exception.ConsensusGroupAlreadyExistException; import org.apache.iotdb.db.consensus.DataRegionConsensusImpl; import org.apache.iotdb.db.consensus.SchemaRegionConsensusImpl; @@ -118,18 +118,16 @@ public class DataNodeRegionManager { dataNodeLocation.getSchemaRegionConsensusEndPoint().getPort()); peers.add(new Peer(schemaRegionId, dataNodeLocation.getDataNodeId(), endpoint)); } - ConsensusGenericResponse consensusGenericResponse = - SchemaRegionConsensusImpl.getInstance().createLocalPeer(schemaRegionId, peers); - if (consensusGenericResponse.isSuccess()) { + try { + SchemaRegionConsensusImpl.getInstance().createLocalPeer(schemaRegionId, peers); tsStatus = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()); - } else if (consensusGenericResponse.getException() - instanceof ConsensusGroupAlreadyExistException) { + } catch (ConsensusGroupAlreadyExistException e) { tsStatus = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()); tsStatus.setMessage( String.format("SchemaRegion %d already exists.", schemaRegionId.getId())); - } else { + } catch (ConsensusException e) { tsStatus = new TSStatus(TSStatusCode.CREATE_REGION_ERROR.getStatusCode()); - tsStatus.setMessage(consensusGenericResponse.getException().getMessage()); + tsStatus.setMessage(e.getMessage()); } } catch (IllegalPathException e1) { LOGGER.error("Create Schema Region {} failed because path is illegal.", storageGroup); @@ -159,17 +157,15 @@ public class DataNodeRegionManager { dataNodeLocation.getDataRegionConsensusEndPoint().getPort()); peers.add(new Peer(dataRegionId, dataNodeLocation.getDataNodeId(), endpoint)); } - ConsensusGenericResponse consensusGenericResponse = - DataRegionConsensusImpl.getInstance().createLocalPeer(dataRegionId, peers); - if (consensusGenericResponse.isSuccess()) { + try { + DataRegionConsensusImpl.getInstance().createLocalPeer(dataRegionId, peers); tsStatus = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()); - } else if (consensusGenericResponse.getException() - instanceof ConsensusGroupAlreadyExistException) { + } catch (ConsensusGroupAlreadyExistException e) { tsStatus = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()); tsStatus.setMessage(String.format("DataRegion %d already exists.", dataRegionId.getId())); - } else { + } catch (ConsensusException e) { tsStatus = new TSStatus(TSStatusCode.CREATE_REGION_ERROR.getStatusCode()); - tsStatus.setMessage(consensusGenericResponse.getException().getMessage()); + tsStatus.setMessage(e.getMessage()); } } catch (DataRegionException e) { LOGGER.error("Create Data Region {} failed because {}", storageGroup, e.getMessage());
