This is an automated email from the ASF dual-hosted git repository. hxd pushed a commit to branch cluster- in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 2fbbdc9d8c5f8348de38316601d3ad3f14db8256 Author: xiangdong huang <[email protected]> AuthorDate: Wed Aug 11 01:06:08 2021 +0800 add lost codes.. --- .../server/raft/AbstractDataRaftService.java | 53 ++++++++++++++++ .../server/raft/AbstractMetaRaftService.java | 51 +++++++++++++++ .../cluster/server/raft/AbstractRaftService.java | 74 ++++++++++++++++++++++ .../server/raft/DataRaftHeartBeatService.java | 67 ++++++++++++++++++++ .../iotdb/cluster/server/raft/DataRaftService.java | 64 +++++++++++++++++++ .../server/raft/MetaRaftHeartBeatService.java | 67 ++++++++++++++++++++ .../iotdb/cluster/server/raft/MetaRaftService.java | 64 +++++++++++++++++++ 7 files changed, 440 insertions(+) diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/raft/AbstractDataRaftService.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/raft/AbstractDataRaftService.java new file mode 100644 index 0000000..23fcc59 --- /dev/null +++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/raft/AbstractDataRaftService.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.cluster.server.raft; + +import org.apache.iotdb.cluster.config.ClusterDescriptor; +import org.apache.iotdb.cluster.rpc.thrift.TSDataService; +import org.apache.iotdb.cluster.server.service.DataGroupServiceImpls; + +public abstract class AbstractDataRaftService extends AbstractRaftService { + + private DataGroupServiceImpls impl; + + @Override + public void initSyncedServiceImpl(Object serviceImpl) { + if (impl != null) { + impl = (DataGroupServiceImpls) serviceImpl; + } + super.initSyncedServiceImpl(serviceImpl); + } + + @Override + public void initAsyncedServiceImpl(Object serviceImpl) { + if (impl != null) { + impl = (DataGroupServiceImpls) serviceImpl; + } + super.initAsyncedServiceImpl(serviceImpl); + } + + @Override + public void initTProcessor() { + if (ClusterDescriptor.getInstance().getConfig().isUseAsyncServer()) { + processor = new TSDataService.AsyncProcessor<>(impl); + } else { + processor = new TSDataService.Processor<>(impl); + } + } +} diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/raft/AbstractMetaRaftService.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/raft/AbstractMetaRaftService.java new file mode 100644 index 0000000..905e0fa --- /dev/null +++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/raft/AbstractMetaRaftService.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.cluster.server.raft; + +import org.apache.iotdb.cluster.config.ClusterDescriptor; +import org.apache.iotdb.cluster.rpc.thrift.TSMetaService; +import org.apache.iotdb.cluster.server.service.MetaAsyncService; +import org.apache.iotdb.cluster.server.service.MetaSyncService; + +public abstract class AbstractMetaRaftService extends AbstractRaftService { + + private MetaAsyncService asyncServiceImpl; + private MetaSyncService syncServiceImpl; + + @Override + public void initSyncedServiceImpl(Object serviceImpl) { + syncServiceImpl = (MetaSyncService) serviceImpl; + super.initSyncedServiceImpl(serviceImpl); + } + + @Override + public void initAsyncedServiceImpl(Object serviceImpl) { + asyncServiceImpl = (MetaAsyncService) serviceImpl; + super.initAsyncedServiceImpl(serviceImpl); + } + + @Override + public void initTProcessor() { + if (ClusterDescriptor.getInstance().getConfig().isUseAsyncServer()) { + processor = new TSMetaService.AsyncProcessor<>(asyncServiceImpl); + } else { + processor = new TSMetaService.Processor<>(syncServiceImpl); + } + } +} diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/raft/AbstractRaftService.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/raft/AbstractRaftService.java new file mode 100644 index 0000000..d62024b --- /dev/null +++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/raft/AbstractRaftService.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.cluster.server.raft; + +import org.apache.iotdb.cluster.config.ClusterDescriptor; +import org.apache.iotdb.cluster.rpc.thrift.TSMetaService; +import org.apache.iotdb.db.conf.IoTDBConfig; +import org.apache.iotdb.db.conf.IoTDBDescriptor; +import org.apache.iotdb.db.exception.runtime.RPCServiceException; +import org.apache.iotdb.db.service.thrift.ThriftService; +import org.apache.iotdb.db.service.thrift.ThriftServiceThread; + +public abstract class AbstractRaftService extends ThriftService { + + public void initThriftServiceThread( + String daemonThreadName, String clientThreadName, ThriftServiceThread.ServerType serverType) + throws IllegalAccessException { + IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig(); + try { + if (ClusterDescriptor.getInstance().getConfig().isUseAsyncServer()) { + thriftServiceThread = + new ThriftServiceThread( + (TSMetaService.AsyncProcessor) processor, + getID().getName(), + clientThreadName, + getBindIP(), + getBindPort(), + config.getRpcMaxConcurrentClientNum(), + config.getThriftServerAwaitTimeForStopService(), + new RaftServiceHandler(), + false, + ClusterDescriptor.getInstance().getConfig().getConnectionTimeoutInMS(), + config.getThriftMaxFrameSize(), + serverType); + } else { + thriftServiceThread = + new ThriftServiceThread( + processor, + getID().getName(), + clientThreadName, + getBindIP(), + getBindPort(), + config.getRpcMaxConcurrentClientNum(), + config.getThriftServerAwaitTimeForStopService(), + new RaftServiceHandler(), + false); + } + } catch (RPCServiceException e) { + throw new IllegalAccessException(e.getMessage()); + } + thriftServiceThread.setName(daemonThreadName); + } + + @Override + public String getBindIP() { + return ClusterDescriptor.getInstance().getConfig().getInternalIp(); + } +} diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/raft/DataRaftHeartBeatService.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/raft/DataRaftHeartBeatService.java new file mode 100644 index 0000000..54a66b0 --- /dev/null +++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/raft/DataRaftHeartBeatService.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.cluster.server.raft; + +import org.apache.iotdb.cluster.config.ClusterDescriptor; +import org.apache.iotdb.cluster.utils.ClusterUtils; +import org.apache.iotdb.db.concurrent.ThreadName; +import org.apache.iotdb.db.service.ServiceType; +import org.apache.iotdb.db.service.thrift.ThriftService; +import org.apache.iotdb.db.service.thrift.ThriftServiceThread; + +public class DataRaftHeartBeatService extends AbstractDataRaftService + implements DataRaftHeartBeatServiceMBean { + + private DataRaftHeartBeatService() {} + + @Override + public ThriftService getImplementation() { + return DataRaftHeartBeatServiceHolder.INSTANCE; + } + + @Override + public ServiceType getID() { + return ServiceType.CLUSTER_DATA_HEART_BEAT_RPC_SERVICE; + } + + @Override + public void initThriftServiceThread() throws IllegalAccessException { + initThriftServiceThread( + ThreadName.CLUSTER_DATA_HEARTBEAT_RPC_SERVICE.getName(), + ThreadName.CLUSTER_DATA_HEARTBEAT_RPC_CLIENT.getName(), + ThriftServiceThread.ServerType.HSHA); + } + + @Override + public int getBindPort() { + return ClusterDescriptor.getInstance().getConfig().getInternalDataPort() + + ClusterUtils.DATA_HEARTBEAT_PORT_OFFSET; + } + + public static DataRaftHeartBeatService getInstance() { + return DataRaftHeartBeatServiceHolder.INSTANCE; + } + + private static class DataRaftHeartBeatServiceHolder { + + private static final DataRaftHeartBeatService INSTANCE = new DataRaftHeartBeatService(); + + private DataRaftHeartBeatServiceHolder() {} + } +} diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/raft/DataRaftService.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/raft/DataRaftService.java new file mode 100644 index 0000000..01cde6e --- /dev/null +++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/raft/DataRaftService.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.cluster.server.raft; + +import org.apache.iotdb.cluster.config.ClusterDescriptor; +import org.apache.iotdb.db.concurrent.ThreadName; +import org.apache.iotdb.db.service.ServiceType; +import org.apache.iotdb.db.service.thrift.ThriftService; +import org.apache.iotdb.db.service.thrift.ThriftServiceThread; + +public class DataRaftService extends AbstractDataRaftService implements DataRaftServiceMBean { + + private DataRaftService() {} + + @Override + public ThriftService getImplementation() { + return DataRaftServiceHolder.INSTANCE; + } + + @Override + public ServiceType getID() { + return ServiceType.CLUSTER_DATA_RPC_SERVICE; + } + + @Override + public void initThriftServiceThread() throws IllegalAccessException { + initThriftServiceThread( + ThreadName.CLUSTER_DATA_RPC_SERVICE.getName(), + ThreadName.CLUSTER_DATA_RPC_CLIENT.getName(), + ThriftServiceThread.ServerType.SELECTOR); + } + + @Override + public int getBindPort() { + return ClusterDescriptor.getInstance().getConfig().getInternalDataPort(); + } + + public static DataRaftService getInstance() { + return DataRaftServiceHolder.INSTANCE; + } + + private static class DataRaftServiceHolder { + + private static final DataRaftService INSTANCE = new DataRaftService(); + + private DataRaftServiceHolder() {} + } +} diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/raft/MetaRaftHeartBeatService.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/raft/MetaRaftHeartBeatService.java new file mode 100644 index 0000000..fc463c4 --- /dev/null +++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/raft/MetaRaftHeartBeatService.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.cluster.server.raft; + +import org.apache.iotdb.cluster.config.ClusterDescriptor; +import org.apache.iotdb.cluster.utils.ClusterUtils; +import org.apache.iotdb.db.concurrent.ThreadName; +import org.apache.iotdb.db.service.ServiceType; +import org.apache.iotdb.db.service.thrift.ThriftService; +import org.apache.iotdb.db.service.thrift.ThriftServiceThread; + +public class MetaRaftHeartBeatService extends AbstractMetaRaftService + implements MetaRaftHeartBeatServiceMBean { + + private MetaRaftHeartBeatService() {} + + @Override + public ThriftService getImplementation() { + return MetaRaftHeartBeatServiceHolder.INSTANCE; + } + + @Override + public ServiceType getID() { + return ServiceType.CLUSTER_META_HEART_BEAT_RPC_SERVICE; + } + + @Override + public void initThriftServiceThread() throws IllegalAccessException { + initThriftServiceThread( + ThreadName.CLUSTER_META_HEARTBEAT_RPC_SERVICE.getName(), + ThreadName.CLUSTER_META_HEARTBEAT_RPC_CLIENT.getName(), + ThriftServiceThread.ServerType.HSHA); + } + + @Override + public int getBindPort() { + return ClusterDescriptor.getInstance().getConfig().getInternalMetaPort() + + ClusterUtils.META_HEARTBEAT_PORT_OFFSET; + } + + public static MetaRaftHeartBeatService getInstance() { + return MetaRaftHeartBeatServiceHolder.INSTANCE; + } + + private static class MetaRaftHeartBeatServiceHolder { + + private static final MetaRaftHeartBeatService INSTANCE = new MetaRaftHeartBeatService(); + + private MetaRaftHeartBeatServiceHolder() {} + } +} diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/raft/MetaRaftService.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/raft/MetaRaftService.java new file mode 100644 index 0000000..d67f04d --- /dev/null +++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/raft/MetaRaftService.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.cluster.server.raft; + +import org.apache.iotdb.cluster.config.ClusterDescriptor; +import org.apache.iotdb.db.concurrent.ThreadName; +import org.apache.iotdb.db.service.ServiceType; +import org.apache.iotdb.db.service.thrift.ThriftService; +import org.apache.iotdb.db.service.thrift.ThriftServiceThread; + +public class MetaRaftService extends AbstractMetaRaftService implements MetaRaftServiceMBean { + + private MetaRaftService() {} + + @Override + public ThriftService getImplementation() { + return MetaRaftServiceHolder.INSTANCE; + } + + @Override + public ServiceType getID() { + return ServiceType.CLUSTER_META_RPC_SERVICE; + } + + @Override + public void initThriftServiceThread() throws IllegalAccessException { + initThriftServiceThread( + ThreadName.CLUSTER_META_RPC_SERVICE.getName(), + ThreadName.CLUSTER_META_RPC_CLIENT.getName(), + ThriftServiceThread.ServerType.SELECTOR); + } + + @Override + public int getBindPort() { + return ClusterDescriptor.getInstance().getConfig().getInternalMetaPort(); + } + + public static MetaRaftService getInstance() { + return MetaRaftServiceHolder.INSTANCE; + } + + private static class MetaRaftServiceHolder { + + private static final MetaRaftService INSTANCE = new MetaRaftService(); + + private MetaRaftServiceHolder() {} + } +}
