This is an automated email from the ASF dual-hosted git repository. jiangtian pushed a commit to branch load_v2 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit fc02a0a59515e52db9bef20244917446b87f6bcf Merge: 7309dd7b48b 7640df9a2b9 Author: Tian Jiang <[email protected]> AuthorDate: Wed Nov 22 09:29:13 2023 +0800 Merge branch 'master' into load_v2 # Conflicts: # iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBThriftAsyncConnector.java # iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/historical/PipeHistoricalDataRegionTsFileExtractor.java .../java/org/apache/iotdb/CountPointProcessor.java | 3 +- .../org/apache/iotdb/it/utils/TsFileGenerator.java | 14 +- .../constant/BuiltinAggregationFunctionEnum.java | 6 + .../apache/iotdb/itbase/constant/TestConstant.java | 24 + .../org/apache/iotdb/db/it/IOTDBLoadTsFileIT.java | 4 +- .../db/it/aggregation/IoTDBAggregationIT.java | 10 +- .../aggregation/IoTDBAggregationSmallDataIT.java | 4 +- .../iotdb/db/it/aggregation/IoTDBVarianceIT.java | 733 +++++++++++++++++++++ .../IoTDBAlignByDeviceWithTemplateIT.java | 584 ++++++++++++++++ .../org/apache/iotdb/db/it/auth/IoTDBAuthIT.java | 55 +- .../iotdb/libudf/it/dprofile/DProfileIT.java | 27 - .../iotdb/session/it/IoTDBSessionSimpleIT.java | 64 +- .../java/org/apache/iotdb/tool/ExportTsFile.java | 8 +- .../iotdb/jdbc/AbstractIoTDBJDBCResultSet.java | 2 +- .../org/apache/iotdb/jdbc/IoTDBDataSource.java | 6 +- .../apache/iotdb/jdbc/IoTDBDatabaseMetadata.java | 86 +-- .../org/apache/iotdb/jdbc/IoTDBJDBCResultSet.java | 2 +- .../java/org/apache/iotdb/jdbc/StringUtils.java | 8 +- .../org/apache/iotdb/session/pool/SessionPool.java | 347 +++++----- .../sql/factory/IoTDBDynamicTableFactory.java | 6 + .../iotdb/confignode/persistence/AuthorInfo.java | 174 +++-- .../confignode/persistence/AuthorInfoTest.java | 134 ++-- .../service/IoTConsensusRPCServiceProcessor.java | 27 +- .../iotdb/consensus/ratis/RatisConsensus.java | 2 +- .../db/pipe/agent/plugin/PipePluginAgent.java | 6 +- .../db/pipe/commit/PipeEventCommitManager.java | 106 +++ .../iotdb/db/pipe/commit/PipeEventCommitter.java | 79 +++ .../env/PipeTaskConnectorRuntimeEnvironment.java} | 15 +- ...oTDBThriftAsyncPipeTransferBatchReqBuilder.java | 49 -- ...IoTDBThriftSyncPipeTransferBatchReqBuilder.java | 45 +- .../builder/PipeTransferBatchReqBuilder.java | 45 ++ .../request/PipeTransferTabletRawReq.java | 2 +- .../thrift/async/IoTDBThriftAsyncConnector.java | 112 +--- .../PipeTransferTabletBatchEventHandler.java | 13 +- .../PipeTransferTabletInsertNodeEventHandler.java | 3 +- .../PipeTransferTabletInsertionEventHandler.java | 39 +- .../handler/PipeTransferTabletRawEventHandler.java | 3 +- .../PipeTransferTsFileInsertionEventHandler.java | 20 +- .../protocol/websocket/WebSocketConnector.java | 53 +- .../websocket/WebSocketConnectorServer.java | 83 +-- .../apache/iotdb/db/pipe/event/EnrichedEvent.java | 39 +- .../event/common/heartbeat/PipeHeartbeatEvent.java | 10 +- .../db/pipe/event/common/row/PipeRowCollector.java | 8 +- .../tablet/PipeInsertNodeTabletInsertionEvent.java | 18 +- .../common/tablet/PipeRawTabletInsertionEvent.java | 16 +- .../tablet/TabletInsertionDataContainer.java | 10 +- .../common/tsfile/PipeTsFileInsertionEvent.java | 17 +- .../tsfile/TsFileInsertionDataContainer.java | 14 +- .../db/pipe/event/realtime/PipeRealtimeEvent.java | 8 +- .../execution/executor/PipeSubtaskExecutor.java | 1 + .../PipeHistoricalDataRegionTsFileExtractor.java | 3 + .../realtime/assigner/PipeDataRegionAssigner.java | 2 +- .../legacy/IoTDBLegacyPipeReceiverAgent.java | 11 +- .../receiver/legacy/loader/DeletionLoader.java | 5 +- .../pipe/receiver/legacy/loader/TsFileLoader.java | 5 +- .../receiver/thrift/IoTDBThriftReceiverV1.java | 2 +- .../apache/iotdb/db/pipe/task/PipeTaskBuilder.java | 3 +- .../pipe/task/connection/PipeEventCollector.java | 10 +- .../db/pipe/task/stage/PipeTaskConnectorStage.java | 17 +- .../db/pipe/task/stage/PipeTaskProcessorStage.java | 2 +- .../iotdb/db/pipe/task/stage/PipeTaskStage.java | 6 +- .../subtask/connector/PipeConnectorSubtask.java | 11 + .../connector/PipeConnectorSubtaskLifeCycle.java | 51 +- .../connector/PipeConnectorSubtaskManager.java | 32 +- .../common/schematree/ClusterSchemaTree.java | 10 +- .../queryengine/common/schematree/ISchemaTree.java | 3 + .../visitor/SchemaTreeVisitorFactory.java | 5 + .../execution/aggregation/Accumulator.java | 9 + .../execution/aggregation/AccumulatorFactory.java | 10 + .../execution/aggregation/AvgAccumulator.java | 13 + .../execution/aggregation/CountAccumulator.java | 9 + .../execution/aggregation/SumAccumulator.java | 9 + .../execution/aggregation/VarianceAccumulator.java | 273 ++++++++ .../slidingwindow/SlidingWindowAggregator.java | 34 - .../SlidingWindowAggregatorFactory.java | 6 + .../SmoothQueueSlidingWindowAggregator.java | 4 +- .../execution/driver/DriverContext.java | 8 - .../execution/operator/OperatorContext.java | 12 +- .../operator/source/AlignedSeriesScanOperator.java | 8 +- .../operator/source/AlignedSeriesScanUtil.java | 13 +- .../timer/RuleBasedTimeSliceAllocator.java | 69 -- .../iotdb/db/queryengine/plan/Coordinator.java | 1 - .../db/queryengine/plan/analyze/Analysis.java | 88 ++- .../queryengine/plan/analyze/AnalyzeVisitor.java | 56 +- .../plan/analyze/ExpressionTypeAnalyzer.java | 11 + .../plan/analyze/LoadTsfileAnalyzer.java | 233 +++---- .../queryengine/plan/analyze/TemplatedAnalyze.java | 403 +++++++++++ .../db/queryengine/plan/analyze/TypeProvider.java | 65 +- .../queryengine/plan/execution/QueryExecution.java | 1 + .../db/queryengine/plan/parser/ASTVisitor.java | 6 + .../plan/planner/LocalExecutionPlanContext.java | 11 - .../plan/planner/LogicalPlanBuilder.java | 21 +- .../plan/planner/LogicalPlanVisitor.java | 8 +- .../plan/planner/OperatorTreeGenerator.java | 138 ++-- .../plan/planner/SubPlanTypeExtractor.java | 16 +- .../plan/planner/TemplatedLogicalPlan.java | 195 ++++++ .../plan/planner/TemplatedLogicalPlanBuilder.java | 145 ++++ .../planner/distribution/ExchangeNodeAdder.java | 3 - .../plan/planner/distribution/SourceRewriter.java | 24 +- .../plan/node/load/LoadSingleTsFileNode.java | 4 +- .../plan/node/load/LoadTsFilePieceNode.java | 8 +- .../plan/parameter/AggregationDescriptor.java | 18 + .../scheduler/load/LoadTsFileDispatcherImpl.java | 18 +- .../crud/InsertMultiTabletsStatement.java | 9 + .../mtree/impl/pbtree/CachedMTreeStore.java | 16 +- .../schemaregion/mtree/traverser/Traverser.java | 2 +- .../db/storageengine/dataregion/DataRegion.java | 26 +- .../impl/ReadChunkCompactionPerformer.java | 5 +- .../execute/utils/CompactionPathUtils.java} | 21 +- .../execute/utils/MultiTsFileDeviceIterator.java | 7 +- .../fast/AlignedSeriesCompactionExecutor.java | 21 +- .../fast/NonAlignedSeriesCompactionExecutor.java | 12 +- .../io/LocalTextModificationAccessor.java | 3 + .../trigger/service/TriggerInformationUpdater.java | 2 +- .../org/apache/iotdb/db/utils/SchemaUtils.java | 24 + .../apache/iotdb/db/utils/TypeInferenceUtils.java | 20 +- .../iotdb/db/utils/constant/SqlConstant.java | 6 + .../db/auth/role/LocalFileRoleAccessorTest.java | 55 +- .../db/auth/role/LocalFileRoleManagerTest.java | 124 ++-- .../db/auth/user/LocalFileUserAccessorTest.java | 74 +-- .../db/auth/user/LocalFileUserManagerTest.java | 48 -- .../event/TsFileInsertionDataContainerTest.java | 2 +- .../execution/aggregation/AccumulatorTest.java | 332 ++++++++++ .../operator/AlignedSeriesScanOperatorTest.java | 15 +- .../execution/operator/OperatorMemoryTest.java | 3 +- .../distribution/DistributionPlannerCycleTest.java | 9 +- .../queryengine/plan/plan/distribution/Util.java | 2 +- .../queryengine/plan/plan/distribution/Util2.java | 16 +- .../commons/auth/authorizer/BasicAuthorizer.java | 4 + .../commons/auth/entity/PriPrivilegeType.java | 52 +- .../iotdb/commons/auth/role/BasicRoleManager.java | 92 ++- .../iotdb/commons/auth/user/BasicUserManager.java | 86 +-- .../udf/builtin/BuiltinAggregationFunction.java | 20 +- .../org/apache/iotdb/commons/utils/AuthUtils.java | 48 +- .../org/apache/iotdb/commons/utils/IOUtils.java | 28 +- ...ReaderTimeseriesMetadataIteratorException.java} | 12 +- .../file/metadata/enums/MetadataIndexNodeType.java | 2 +- ...leSequenceReaderTimeseriesMetadataIterator.java | 220 +++++++ ...quenceReaderTimeseriesMetadataIteratorTest.java | 62 ++ .../thrift-commons/src/main/thrift/common.thrift | 8 +- library-udf/src/assembly/tools/register-UDF.bat | 1 - library-udf/src/assembly/tools/register-UDF.sh | 1 - .../apache/iotdb/library/dprofile/UDAFStddev.java | 66 -- 143 files changed, 5117 insertions(+), 1711 deletions(-) diff --cc iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBThriftAsyncConnector.java index e501d322e53,b51b4284c8c..7b224c996d3 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBThriftAsyncConnector.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBThriftAsyncConnector.java @@@ -72,34 -57,15 +72,29 @@@ import org.apache.thrift.async.AsyncMet import org.slf4j.Logger; import org.slf4j.LoggerFactory; - import javax.annotation.Nullable; - import java.io.FileNotFoundException; import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; import java.util.Comparator; import java.util.HashMap; +import java.util.List; +import java.util.Map; - import java.util.Optional; - import java.util.PriorityQueue; import java.util.concurrent.PriorityBlockingQueue; import java.util.concurrent.atomic.AtomicBoolean; - import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; +import static org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_EXTERNAL_CONFIG_NODES_KEY; +import static org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_EXTERNAL_USER_NAME_DEFAULT_VALUE; +import static org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_EXTERNAL_USER_NAME_KEY; +import static org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_EXTERNAL_USER_PASSWORD_DEFAULT_VALUE; +import static org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_EXTERNAL_USER_PASSWORD_KEY; import static org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_BATCH_MODE_ENABLE_KEY; +import static org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_LOCAL_SPLIT_ENABLE_KEY; +import static org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_SPLIT_MAX_CONCURRENT_FILE_DEFAULT_VALUE; +import static org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_SPLIT_MAX_CONCURRENT_FILE_KEY; +import static org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_SPLIT_MAX_SIZE_DEFAULT_VALUE; +import static org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_SPLIT_MAX_SIZE_KEY; import static org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.SINK_IOTDB_BATCH_MODE_ENABLE_KEY; import static org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.SINK_IOTDB_SSL_ENABLE_KEY; @@@ -116,19 -82,16 +111,20 @@@ public class IoTDBThriftAsyncConnector ASYNC_PIPE_DATA_TRANSFER_CLIENT_MANAGER_HOLDER = new AtomicReference<>(); private final IClientManager<TEndPoint, AsyncPipeDataTransferServiceClient> asyncPipeDataTransferClientManager; + private static final AtomicReference<IClientManager<TEndPoint, SyncDataNodeInternalServiceClient>> + SYNC_DATA_NODE_CLIENT_MANAGER_HOLDER = new AtomicReference<>(); + private final IClientManager<TEndPoint, SyncDataNodeInternalServiceClient> + syncDataNodeClientManager; private final IoTDBThriftSyncConnector retryConnector = new IoTDBThriftSyncConnector(); - private final PriorityBlockingQueue<Pair<Long, Event>> retryEventQueue = - new PriorityBlockingQueue<>(11, Comparator.comparing(o -> o.left)); - - private final AtomicLong commitIdGenerator = new AtomicLong(0); - private final AtomicLong lastCommitId = new AtomicLong(0); - private final PriorityQueue<Pair<Long, Runnable>> commitQueue = - new PriorityQueue<>(Comparator.comparing(o -> o.left)); + private final PriorityBlockingQueue<Event> retryEventQueue = + new PriorityBlockingQueue<>( + 11, + Comparator.comparing( + e -> + // Non-enriched events will be put at the front of the queue, + // because they are more likely to be lost and need to be retried first. + e instanceof EnrichedEvent ? ((EnrichedEvent) e).getCommitId() : 0)); private IoTDBThriftAsyncPipeTransferBatchReqBuilder tabletBatchBuilder; @@@ -392,99 -298,12 +386,97 @@@ throw new FileNotFoundException(pipeTsFileInsertionEvent.getTsFile().getAbsolutePath()); } - final long requestCommitId = commitIdGenerator.incrementAndGet(); final PipeTransferTsFileInsertionEventHandler pipeTransferTsFileInsertionEventHandler = - new PipeTransferTsFileInsertionEventHandler( - requestCommitId, pipeTsFileInsertionEvent, this); + new PipeTransferTsFileInsertionEventHandler(pipeTsFileInsertionEvent, this); - transfer(requestCommitId, pipeTransferTsFileInsertionEventHandler); + transfer(pipeTsFileInsertionEvent.getCommitId(), pipeTransferTsFileInsertionEventHandler); } + @Override + public void transfer(TsFileBatchInsertionEvent tsFileInsertionEvent) throws Exception { + transferQueuedEventsIfNecessary(); + transferBatchedEventsIfNecessary(); + + if (!(tsFileInsertionEvent instanceof PipeBatchTsFileInsertionEvent)) { + LOGGER.warn( + "IoTDBThriftAsyncConnector only support PipeBatchTsFileInsertionEvent. Current event: {}.", + tsFileInsertionEvent); + return; + } + + if (((EnrichedEvent) tsFileInsertionEvent).shouldParsePatternOrTime()) { + try { + for (final TabletInsertionEvent event : tsFileInsertionEvent.toTabletInsertionEvents()) { + transfer(event); + } + } finally { + tsFileInsertionEvent.close(); + } + return; + } + + final PipeBatchTsFileInsertionEvent pipeTsFileInsertionEvent = + (PipeBatchTsFileInsertionEvent) tsFileInsertionEvent; + pipeTsFileInsertionEvent.waitForTsFileClose(); + + if (!useLocalSplit) { + List<PipeTsFileInsertionEvent> pipeTsFileInsertionEvents = + pipeTsFileInsertionEvent.toSingleFileEvents(); + for (PipeTsFileInsertionEvent fileInsertionEvent : pipeTsFileInsertionEvents) { + transfer(fileInsertionEvent); + } + } else { + final long requestCommitId = commitIdGenerator.incrementAndGet(); + transfer(requestCommitId, pipeTsFileInsertionEvent); + } + } + + private void transfer( + long requestCommitId, PipeBatchTsFileInsertionEvent pipeTsFileInsertionEvent) + throws IOException { + LoadTsFileNode loadTsFileNode = + new LoadTsFileNode( + new PlanNodeId("Pipe-" + requestCommitId), pipeTsFileInsertionEvent.getResources()); + + IPartitionFetcher partitionFetcher = + new ExternalPartitionFetcher(targetConfigNodes, thriftClientProperty, targetSeriesSlotNum); + DataPartitionBatchFetcher dataPartitionBatchFetcher = + new DataPartitionBatchFetcher(partitionFetcher); + TsFileSplitSender splitSender = + new TsFileSplitSender( + loadTsFileNode, + dataPartitionBatchFetcher, + targetPartitionInterval, + syncDataNodeClientManager, + false, + splitMaxSize, + maxConcurrentFileNum, + targetUserName, + targetPassword); + splitSender.start(); + LOGGER.info( + "Sending {} files to {} complete", + pipeTsFileInsertionEvent.getTsFiles().size(), + targetConfigNodes); + + if (splitSender.getStatistic().isHasP2Timeout()) { + double throughput = splitSender.getStatistic().p2ThroughputMbps(); + Map<String, Object> param = new HashMap<>(2); + param.put( + PipeBatchTsFileInsertionEvent.CONNECTOR_TIMEOUT_MS, + splitSender.getStatistic().getP2Timeout()); + param.put(PipeBatchTsFileInsertionEvent.CONNECTOR_THROUGHPUT_MBPS_KEY, throughput); + pipeTsFileInsertionEvent.getExtractorOnConnectorTimeout().apply(param); + } else { + double throughput = splitSender.getStatistic().p2ThroughputMbps(); + pipeTsFileInsertionEvent + .getExtractorOnConnectorSuccess() + .apply( + Collections.singletonMap( + PipeBatchTsFileInsertionEvent.CONNECTOR_THROUGHPUT_MBPS_KEY, throughput)); + } + } + private void transfer( long requestCommitId, PipeTransferTsFileInsertionEventHandler pipeTransferTsFileInsertionEventHandler) { diff --cc iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/historical/PipeHistoricalDataRegionTsFileExtractor.java index f561d38568e,522622bca6c..ef45119441a --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/historical/PipeHistoricalDataRegionTsFileExtractor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/historical/PipeHistoricalDataRegionTsFileExtractor.java @@@ -72,18 -72,19 +72,19 @@@ public class PipeHistoricalDataRegionTs private static final Map<Integer, Long> DATA_REGION_ID_TO_PIPE_FLUSHED_TIME_MAP = new HashMap<>(); private static final long PIPE_MIN_FLUSH_INTERVAL_IN_MS = 2000; - private String pipeName; - private PipeTaskMeta pipeTaskMeta; - private ProgressIndex startIndex; ++ protected String pipeName; + protected PipeTaskMeta pipeTaskMeta; + protected ProgressIndex startIndex; - private int dataRegionId; + protected int dataRegionId; - private String pattern; + protected String pattern; private boolean isDbNameCoveredByPattern = false; - private long historicalDataExtractionStartTime; // Event time - private long historicalDataExtractionEndTime; // Event time + protected long historicalDataExtractionStartTime; // Event time + protected long historicalDataExtractionEndTime; // Event time - private long historicalDataExtractionTimeLowerBound; // Arrival time + protected long historicalDataExtractionTimeLowerBound; // Arrival time private boolean sloppyTimeRange; // true to disable time range filter after extraction
