This is an automated email from the ASF dual-hosted git repository. qiaojialin pushed a commit to branch fix_sync in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit fca6f8ecf65f2f4aa462e7c0c8abb0000d54a3b6 Author: qiaojialin <[email protected]> AuthorDate: Wed Jul 13 17:46:57 2022 +0800 support mqtt operation sync --- .../org/apache/iotdb/db/engine/StorageEngine.java | 109 ++++++++++++++++++++- .../iotdb/db/protocol/mqtt/PublishHandler.java | 9 +- .../db/service/thrift/impl/TSServiceImpl.java | 104 +------------------- 3 files changed, 118 insertions(+), 104 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java index a1c6a056d9..669957515d 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java @@ -23,6 +23,13 @@ import org.apache.iotdb.db.concurrent.ThreadName; import org.apache.iotdb.db.conf.IoTDBConfig; import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.conf.ServerConfigConsistent; +import org.apache.iotdb.db.doublelive.OperationSyncConsumer; +import org.apache.iotdb.db.doublelive.OperationSyncDDLProtector; +import org.apache.iotdb.db.doublelive.OperationSyncDMLProtector; +import org.apache.iotdb.db.doublelive.OperationSyncLogService; +import org.apache.iotdb.db.doublelive.OperationSyncPlanTypeUtils; +import org.apache.iotdb.db.doublelive.OperationSyncProducer; +import org.apache.iotdb.db.doublelive.OperationSyncWriteTask; import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory; import org.apache.iotdb.db.engine.flush.CloseFileListener; import org.apache.iotdb.db.engine.flush.FlushListener; @@ -50,6 +57,7 @@ import org.apache.iotdb.db.metadata.idtable.entry.DeviceIDFactory; import org.apache.iotdb.db.metadata.mnode.IMeasurementMNode; import org.apache.iotdb.db.metadata.mnode.IStorageGroupMNode; import org.apache.iotdb.db.metadata.path.PartialPath; +import org.apache.iotdb.db.qp.physical.PhysicalPlan; import org.apache.iotdb.db.qp.physical.crud.InsertPlan; import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan; import org.apache.iotdb.db.qp.physical.crud.InsertRowsOfOneDevicePlan; @@ -64,6 +72,7 @@ import org.apache.iotdb.db.utils.UpgradeUtils; import org.apache.iotdb.rpc.RpcUtils; import org.apache.iotdb.rpc.TSStatusCode; import org.apache.iotdb.service.rpc.thrift.TSStatus; +import org.apache.iotdb.session.pool.SessionPool; import org.apache.iotdb.tsfile.utils.FilePathUtils; import org.apache.iotdb.tsfile.utils.Pair; @@ -71,8 +80,11 @@ import org.apache.commons.io.FileUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.ByteArrayOutputStream; +import java.io.DataOutputStream; import java.io.File; import java.io.IOException; +import java.nio.ByteBuffer; import java.nio.file.Files; import java.nio.file.Paths; import java.util.ArrayList; @@ -86,6 +98,8 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Set; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; @@ -101,6 +115,14 @@ public class StorageEngine implements IService { private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig(); private static final long TTL_CHECK_INTERVAL = 60 * 1000L; + /* OperationSync module */ + private static final boolean isEnableOperationSync = + IoTDBDescriptor.getInstance().getConfig().isEnableOperationSync(); + private static SessionPool operationSyncsessionPool; + private static OperationSyncProducer operationSyncProducer; + private static OperationSyncDDLProtector operationSyncDDLProtector; + private static OperationSyncLogService operationSyncDDLLogService; + /** * Time range for dividing storage group, the time unit is the same with IoTDB's * TimestampPrecision @@ -135,7 +157,53 @@ public class StorageEngine implements IService { private List<CloseFileListener> customCloseFileListeners = new ArrayList<>(); private List<FlushListener> customFlushListeners = new ArrayList<>(); - private StorageEngine() {} + private StorageEngine() { + if (isEnableOperationSync) { + /* Open OperationSync */ + IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig(); + // create SessionPool for OperationSync + operationSyncsessionPool = + new SessionPool( + config.getSecondaryAddress(), + config.getSecondaryPort(), + config.getSecondaryUser(), + config.getSecondaryPassword(), + 5); + + // create operationSyncDDLProtector and operationSyncDDLLogService + operationSyncDDLProtector = new OperationSyncDDLProtector(operationSyncsessionPool); + new Thread(operationSyncDDLProtector).start(); + operationSyncDDLLogService = + new OperationSyncLogService("OperationSyncDDLLog", operationSyncDDLProtector); + new Thread(operationSyncDDLLogService).start(); + + // create OperationSyncProducer + BlockingQueue<Pair<ByteBuffer, OperationSyncPlanTypeUtils.OperationSyncPlanType>> + blockingQueue = new ArrayBlockingQueue<>(config.getOperationSyncProducerCacheSize()); + operationSyncProducer = new OperationSyncProducer(blockingQueue); + + // create OperationSyncDMLProtector and OperationSyncDMLLogService + OperationSyncDMLProtector operationSyncDMLProtector = + new OperationSyncDMLProtector(operationSyncDDLProtector, operationSyncProducer); + new Thread(operationSyncDMLProtector).start(); + OperationSyncLogService operationSyncDMLLogService = + new OperationSyncLogService("OperationSyncDMLLog", operationSyncDMLProtector); + new Thread(operationSyncDMLLogService).start(); + + // create OperationSyncConsumer + for (int i = 0; i < config.getOperationSyncConsumerConcurrencySize(); i++) { + OperationSyncConsumer consumer = + new OperationSyncConsumer( + blockingQueue, operationSyncsessionPool, operationSyncDMLLogService); + new Thread(consumer).start(); + } + } else { + operationSyncsessionPool = null; + operationSyncProducer = null; + operationSyncDDLProtector = null; + operationSyncDDLLogService = null; + } + } public static StorageEngine getInstance() { return InstanceHolder.INSTANCE; @@ -147,6 +215,45 @@ public class StorageEngine implements IService { IoTDBDescriptor.getInstance().getConfig().getPartitionInterval() * 1000L); } + public static void transmitOperationSync(PhysicalPlan physicalPlan) { + + OperationSyncPlanTypeUtils.OperationSyncPlanType planType = + OperationSyncPlanTypeUtils.getOperationSyncPlanType(physicalPlan); + if (planType == null) { + // Don't need OperationSync + return; + } + + // serialize physical plan + ByteBuffer buffer; + try { + int size = physicalPlan.getSerializedSize(); + ByteArrayOutputStream operationSyncByteStream = new ByteArrayOutputStream(size); + DataOutputStream operationSyncSerializeStream = new DataOutputStream(operationSyncByteStream); + physicalPlan.serialize(operationSyncSerializeStream); + buffer = ByteBuffer.wrap(operationSyncByteStream.toByteArray()); + } catch (IOException e) { + logger.error("OperationSync can't serialize PhysicalPlan", e); + return; + } + + switch (planType) { + case DDLPlan: + // Create OperationSyncWriteTask and wait + OperationSyncWriteTask ddlTask = + new OperationSyncWriteTask( + buffer, + operationSyncsessionPool, + operationSyncDDLProtector, + operationSyncDDLLogService); + ddlTask.run(); + break; + case DMLPlan: + // Put into OperationSyncProducer + operationSyncProducer.put(new Pair<>(buffer, planType)); + } + } + public static long convertMilliWithPrecision(long milliTime) { long result = milliTime; String timePrecision = IoTDBDescriptor.getInstance().getConfig().getTimestampPrecision(); diff --git a/server/src/main/java/org/apache/iotdb/db/protocol/mqtt/PublishHandler.java b/server/src/main/java/org/apache/iotdb/db/protocol/mqtt/PublishHandler.java index 3bf7144e73..a4e707a6d5 100644 --- a/server/src/main/java/org/apache/iotdb/db/protocol/mqtt/PublishHandler.java +++ b/server/src/main/java/org/apache/iotdb/db/protocol/mqtt/PublishHandler.java @@ -18,6 +18,8 @@ package org.apache.iotdb.db.protocol.mqtt; import org.apache.iotdb.db.conf.IoTDBConfig; +import org.apache.iotdb.db.conf.IoTDBDescriptor; +import org.apache.iotdb.db.engine.StorageEngine; import org.apache.iotdb.db.metadata.path.PartialPath; import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan; import org.apache.iotdb.db.service.IoTDB; @@ -44,7 +46,8 @@ public class PublishHandler extends AbstractInterceptHandler { private final ServiceProvider serviceProvider = IoTDB.serviceProvider; private long sessionId; - + private static final boolean isEnableOperationSync = + IoTDBDescriptor.getInstance().getConfig().isEnableOperationSync(); private static final Logger LOG = LoggerFactory.getLogger(PublishHandler.class); private final PayloadFormatter payloadFormat; @@ -123,6 +126,10 @@ public class PublishHandler extends AbstractInterceptHandler { if (tsStatus != null) { LOG.warn(tsStatus.message); } else { + if (isEnableOperationSync) { + // OperationSync should transmit before execute + StorageEngine.transmitOperationSync(plan); + } status = serviceProvider.executeNonQuery(plan); } } catch (Exception e) { diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/TSServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/TSServiceImpl.java index 5717f3ca5e..44a911f6cb 100644 --- a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/TSServiceImpl.java +++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/TSServiceImpl.java @@ -25,13 +25,8 @@ import org.apache.iotdb.db.conf.IoTDBConfig; import org.apache.iotdb.db.conf.IoTDBConstant; import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.conf.OperationType; -import org.apache.iotdb.db.doublelive.OperationSyncConsumer; -import org.apache.iotdb.db.doublelive.OperationSyncDDLProtector; -import org.apache.iotdb.db.doublelive.OperationSyncDMLProtector; -import org.apache.iotdb.db.doublelive.OperationSyncLogService; import org.apache.iotdb.db.doublelive.OperationSyncPlanTypeUtils; -import org.apache.iotdb.db.doublelive.OperationSyncProducer; -import org.apache.iotdb.db.doublelive.OperationSyncWriteTask; +import org.apache.iotdb.db.engine.StorageEngine; import org.apache.iotdb.db.engine.selectinto.InsertTabletPlansIterator; import org.apache.iotdb.db.exception.IoTDBException; import org.apache.iotdb.db.exception.QueryInBatchStatementException; @@ -132,7 +127,6 @@ import org.apache.iotdb.service.rpc.thrift.TSSetUsingTemplateReq; import org.apache.iotdb.service.rpc.thrift.TSStatus; import org.apache.iotdb.service.rpc.thrift.TSTracingInfo; import org.apache.iotdb.service.rpc.thrift.TSUnsetSchemaTemplateReq; -import org.apache.iotdb.session.pool.SessionPool; import org.apache.iotdb.tsfile.exception.filter.QueryFilterOptimizationException; import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException; import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType; @@ -140,14 +134,11 @@ import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding; import org.apache.iotdb.tsfile.read.common.Path; import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet; -import org.apache.iotdb.tsfile.utils.Pair; import org.apache.thrift.TException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.ByteArrayOutputStream; -import java.io.DataOutputStream; import java.io.IOException; import java.nio.ByteBuffer; import java.sql.SQLException; @@ -158,8 +149,6 @@ import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Map.Entry; -import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.BlockingQueue; import java.util.concurrent.Callable; import java.util.concurrent.Future; import java.util.stream.Collectors; @@ -180,13 +169,8 @@ import static org.apache.iotdb.db.utils.ErrorHandlingUtils.onQueryException; /** Thrift RPC implementation at server side. */ public class TSServiceImpl implements TSIService.Iface { - /* OperationSync module */ private static final boolean isEnableOperationSync = IoTDBDescriptor.getInstance().getConfig().isEnableOperationSync(); - private final SessionPool operationSyncsessionPool; - private final OperationSyncProducer operationSyncProducer; - private final OperationSyncDDLProtector operationSyncDDLProtector; - private final OperationSyncLogService operationSyncDDLLogService; protected class QueryTask implements Callable<TSExecuteStatementResp> { @@ -331,51 +315,6 @@ public class TSServiceImpl implements TSIService.Iface { public TSServiceImpl() { super(); serviceProvider = IoTDB.serviceProvider; - if (isEnableOperationSync) { - /* Open OperationSync */ - IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig(); - // create SessionPool for OperationSync - operationSyncsessionPool = - new SessionPool( - config.getSecondaryAddress(), - config.getSecondaryPort(), - config.getSecondaryUser(), - config.getSecondaryPassword(), - 5); - - // create operationSyncDDLProtector and operationSyncDDLLogService - operationSyncDDLProtector = new OperationSyncDDLProtector(operationSyncsessionPool); - new Thread(operationSyncDDLProtector).start(); - operationSyncDDLLogService = - new OperationSyncLogService("OperationSyncDDLLog", operationSyncDDLProtector); - new Thread(operationSyncDDLLogService).start(); - - // create OperationSyncProducer - BlockingQueue<Pair<ByteBuffer, OperationSyncPlanTypeUtils.OperationSyncPlanType>> - blockingQueue = new ArrayBlockingQueue<>(config.getOperationSyncProducerCacheSize()); - operationSyncProducer = new OperationSyncProducer(blockingQueue); - - // create OperationSyncDMLProtector and OperationSyncDMLLogService - OperationSyncDMLProtector operationSyncDMLProtector = - new OperationSyncDMLProtector(operationSyncDDLProtector, operationSyncProducer); - new Thread(operationSyncDMLProtector).start(); - OperationSyncLogService operationSyncDMLLogService = - new OperationSyncLogService("OperationSyncDMLLog", operationSyncDMLProtector); - new Thread(operationSyncDMLLogService).start(); - - // create OperationSyncConsumer - for (int i = 0; i < config.getOperationSyncConsumerConcurrencySize(); i++) { - OperationSyncConsumer consumer = - new OperationSyncConsumer( - blockingQueue, operationSyncsessionPool, operationSyncDMLLogService); - new Thread(consumer).start(); - } - } else { - operationSyncsessionPool = null; - operationSyncProducer = null; - operationSyncDDLProtector = null; - operationSyncDDLLogService = null; - } } @Override @@ -2224,7 +2163,7 @@ public class TSServiceImpl implements TSIService.Iface { try { if (isEnableOperationSync) { // OperationSync should transmit before execute - transmitOperationSync(plan); + StorageEngine.transmitOperationSync(plan); } return serviceProvider.executeNonQuery(plan) ? RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS, "Execute successfully") @@ -2240,45 +2179,6 @@ public class TSServiceImpl implements TSIService.Iface { "Log in failed. Either you are not authorized or the session has timed out."); } - private void transmitOperationSync(PhysicalPlan physicalPlan) { - - OperationSyncPlanTypeUtils.OperationSyncPlanType planType = - OperationSyncPlanTypeUtils.getOperationSyncPlanType(physicalPlan); - if (planType == null) { - // Don't need OperationSync - return; - } - - // serialize physical plan - ByteBuffer buffer; - try { - int size = physicalPlan.getSerializedSize(); - ByteArrayOutputStream operationSyncByteStream = new ByteArrayOutputStream(size); - DataOutputStream operationSyncSerializeStream = new DataOutputStream(operationSyncByteStream); - physicalPlan.serialize(operationSyncSerializeStream); - buffer = ByteBuffer.wrap(operationSyncByteStream.toByteArray()); - } catch (IOException e) { - LOGGER.error("OperationSync can't serialize PhysicalPlan", e); - return; - } - - switch (planType) { - case DDLPlan: - // Create OperationSyncWriteTask and wait - OperationSyncWriteTask ddlTask = - new OperationSyncWriteTask( - buffer, - operationSyncsessionPool, - operationSyncDDLProtector, - operationSyncDDLLogService); - ddlTask.run(); - break; - case DMLPlan: - // Put into OperationSyncProducer - operationSyncProducer.put(new Pair<>(buffer, planType)); - } - } - /** Add stat of operation into metrics */ private void addOperationLatency(Operation operation, long startTime) { if (MetricConfigDescriptor.getInstance().getMetricConfig().getEnablePerformanceStat()) {
