This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch load_v2
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit fc02a0a59515e52db9bef20244917446b87f6bcf
Merge: 7309dd7b48b 7640df9a2b9
Author: Tian Jiang <[email protected]>
AuthorDate: Wed Nov 22 09:29:13 2023 +0800

    Merge branch 'master' into load_v2
    
    # Conflicts:
    #       
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBThriftAsyncConnector.java
    #       
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/historical/PipeHistoricalDataRegionTsFileExtractor.java

 .../java/org/apache/iotdb/CountPointProcessor.java |   3 +-
 .../org/apache/iotdb/it/utils/TsFileGenerator.java |  14 +-
 .../constant/BuiltinAggregationFunctionEnum.java   |   6 +
 .../apache/iotdb/itbase/constant/TestConstant.java |  24 +
 .../org/apache/iotdb/db/it/IOTDBLoadTsFileIT.java  |   4 +-
 .../db/it/aggregation/IoTDBAggregationIT.java      |  10 +-
 .../aggregation/IoTDBAggregationSmallDataIT.java   |   4 +-
 .../iotdb/db/it/aggregation/IoTDBVarianceIT.java   | 733 +++++++++++++++++++++
 .../IoTDBAlignByDeviceWithTemplateIT.java          | 584 ++++++++++++++++
 .../org/apache/iotdb/db/it/auth/IoTDBAuthIT.java   |  55 +-
 .../iotdb/libudf/it/dprofile/DProfileIT.java       |  27 -
 .../iotdb/session/it/IoTDBSessionSimpleIT.java     |  64 +-
 .../java/org/apache/iotdb/tool/ExportTsFile.java   |   8 +-
 .../iotdb/jdbc/AbstractIoTDBJDBCResultSet.java     |   2 +-
 .../org/apache/iotdb/jdbc/IoTDBDataSource.java     |   6 +-
 .../apache/iotdb/jdbc/IoTDBDatabaseMetadata.java   |  86 +--
 .../org/apache/iotdb/jdbc/IoTDBJDBCResultSet.java  |   2 +-
 .../java/org/apache/iotdb/jdbc/StringUtils.java    |   8 +-
 .../org/apache/iotdb/session/pool/SessionPool.java | 347 +++++-----
 .../sql/factory/IoTDBDynamicTableFactory.java      |   6 +
 .../iotdb/confignode/persistence/AuthorInfo.java   | 174 +++--
 .../confignode/persistence/AuthorInfoTest.java     | 134 ++--
 .../service/IoTConsensusRPCServiceProcessor.java   |  27 +-
 .../iotdb/consensus/ratis/RatisConsensus.java      |   2 +-
 .../db/pipe/agent/plugin/PipePluginAgent.java      |   6 +-
 .../db/pipe/commit/PipeEventCommitManager.java     | 106 +++
 .../iotdb/db/pipe/commit/PipeEventCommitter.java   |  79 +++
 .../env/PipeTaskConnectorRuntimeEnvironment.java}  |  15 +-
 ...oTDBThriftAsyncPipeTransferBatchReqBuilder.java |  49 --
 ...IoTDBThriftSyncPipeTransferBatchReqBuilder.java |  45 +-
 .../builder/PipeTransferBatchReqBuilder.java       |  45 ++
 .../request/PipeTransferTabletRawReq.java          |   2 +-
 .../thrift/async/IoTDBThriftAsyncConnector.java    | 112 +---
 .../PipeTransferTabletBatchEventHandler.java       |  13 +-
 .../PipeTransferTabletInsertNodeEventHandler.java  |   3 +-
 .../PipeTransferTabletInsertionEventHandler.java   |  39 +-
 .../handler/PipeTransferTabletRawEventHandler.java |   3 +-
 .../PipeTransferTsFileInsertionEventHandler.java   |  20 +-
 .../protocol/websocket/WebSocketConnector.java     |  53 +-
 .../websocket/WebSocketConnectorServer.java        |  83 +--
 .../apache/iotdb/db/pipe/event/EnrichedEvent.java  |  39 +-
 .../event/common/heartbeat/PipeHeartbeatEvent.java |  10 +-
 .../db/pipe/event/common/row/PipeRowCollector.java |   8 +-
 .../tablet/PipeInsertNodeTabletInsertionEvent.java |  18 +-
 .../common/tablet/PipeRawTabletInsertionEvent.java |  16 +-
 .../tablet/TabletInsertionDataContainer.java       |  10 +-
 .../common/tsfile/PipeTsFileInsertionEvent.java    |  17 +-
 .../tsfile/TsFileInsertionDataContainer.java       |  14 +-
 .../db/pipe/event/realtime/PipeRealtimeEvent.java  |   8 +-
 .../execution/executor/PipeSubtaskExecutor.java    |   1 +
 .../PipeHistoricalDataRegionTsFileExtractor.java   |   3 +
 .../realtime/assigner/PipeDataRegionAssigner.java  |   2 +-
 .../legacy/IoTDBLegacyPipeReceiverAgent.java       |  11 +-
 .../receiver/legacy/loader/DeletionLoader.java     |   5 +-
 .../pipe/receiver/legacy/loader/TsFileLoader.java  |   5 +-
 .../receiver/thrift/IoTDBThriftReceiverV1.java     |   2 +-
 .../apache/iotdb/db/pipe/task/PipeTaskBuilder.java |   3 +-
 .../pipe/task/connection/PipeEventCollector.java   |  10 +-
 .../db/pipe/task/stage/PipeTaskConnectorStage.java |  17 +-
 .../db/pipe/task/stage/PipeTaskProcessorStage.java |   2 +-
 .../iotdb/db/pipe/task/stage/PipeTaskStage.java    |   6 +-
 .../subtask/connector/PipeConnectorSubtask.java    |  11 +
 .../connector/PipeConnectorSubtaskLifeCycle.java   |  51 +-
 .../connector/PipeConnectorSubtaskManager.java     |  32 +-
 .../common/schematree/ClusterSchemaTree.java       |  10 +-
 .../queryengine/common/schematree/ISchemaTree.java |   3 +
 .../visitor/SchemaTreeVisitorFactory.java          |   5 +
 .../execution/aggregation/Accumulator.java         |   9 +
 .../execution/aggregation/AccumulatorFactory.java  |  10 +
 .../execution/aggregation/AvgAccumulator.java      |  13 +
 .../execution/aggregation/CountAccumulator.java    |   9 +
 .../execution/aggregation/SumAccumulator.java      |   9 +
 .../execution/aggregation/VarianceAccumulator.java | 273 ++++++++
 .../slidingwindow/SlidingWindowAggregator.java     |  34 -
 .../SlidingWindowAggregatorFactory.java            |   6 +
 .../SmoothQueueSlidingWindowAggregator.java        |   4 +-
 .../execution/driver/DriverContext.java            |   8 -
 .../execution/operator/OperatorContext.java        |  12 +-
 .../operator/source/AlignedSeriesScanOperator.java |   8 +-
 .../operator/source/AlignedSeriesScanUtil.java     |  13 +-
 .../timer/RuleBasedTimeSliceAllocator.java         |  69 --
 .../iotdb/db/queryengine/plan/Coordinator.java     |   1 -
 .../db/queryengine/plan/analyze/Analysis.java      |  88 ++-
 .../queryengine/plan/analyze/AnalyzeVisitor.java   |  56 +-
 .../plan/analyze/ExpressionTypeAnalyzer.java       |  11 +
 .../plan/analyze/LoadTsfileAnalyzer.java           | 233 +++----
 .../queryengine/plan/analyze/TemplatedAnalyze.java | 403 +++++++++++
 .../db/queryengine/plan/analyze/TypeProvider.java  |  65 +-
 .../queryengine/plan/execution/QueryExecution.java |   1 +
 .../db/queryengine/plan/parser/ASTVisitor.java     |   6 +
 .../plan/planner/LocalExecutionPlanContext.java    |  11 -
 .../plan/planner/LogicalPlanBuilder.java           |  21 +-
 .../plan/planner/LogicalPlanVisitor.java           |   8 +-
 .../plan/planner/OperatorTreeGenerator.java        | 138 ++--
 .../plan/planner/SubPlanTypeExtractor.java         |  16 +-
 .../plan/planner/TemplatedLogicalPlan.java         | 195 ++++++
 .../plan/planner/TemplatedLogicalPlanBuilder.java  | 145 ++++
 .../planner/distribution/ExchangeNodeAdder.java    |   3 -
 .../plan/planner/distribution/SourceRewriter.java  |  24 +-
 .../plan/node/load/LoadSingleTsFileNode.java       |   4 +-
 .../plan/node/load/LoadTsFilePieceNode.java        |   8 +-
 .../plan/parameter/AggregationDescriptor.java      |  18 +
 .../scheduler/load/LoadTsFileDispatcherImpl.java   |  18 +-
 .../crud/InsertMultiTabletsStatement.java          |   9 +
 .../mtree/impl/pbtree/CachedMTreeStore.java        |  16 +-
 .../schemaregion/mtree/traverser/Traverser.java    |   2 +-
 .../db/storageengine/dataregion/DataRegion.java    |  26 +-
 .../impl/ReadChunkCompactionPerformer.java         |   5 +-
 .../execute/utils/CompactionPathUtils.java}        |  21 +-
 .../execute/utils/MultiTsFileDeviceIterator.java   |   7 +-
 .../fast/AlignedSeriesCompactionExecutor.java      |  21 +-
 .../fast/NonAlignedSeriesCompactionExecutor.java   |  12 +-
 .../io/LocalTextModificationAccessor.java          |   3 +
 .../trigger/service/TriggerInformationUpdater.java |   2 +-
 .../org/apache/iotdb/db/utils/SchemaUtils.java     |  24 +
 .../apache/iotdb/db/utils/TypeInferenceUtils.java  |  20 +-
 .../iotdb/db/utils/constant/SqlConstant.java       |   6 +
 .../db/auth/role/LocalFileRoleAccessorTest.java    |  55 +-
 .../db/auth/role/LocalFileRoleManagerTest.java     | 124 ++--
 .../db/auth/user/LocalFileUserAccessorTest.java    |  74 +--
 .../db/auth/user/LocalFileUserManagerTest.java     |  48 --
 .../event/TsFileInsertionDataContainerTest.java    |   2 +-
 .../execution/aggregation/AccumulatorTest.java     | 332 ++++++++++
 .../operator/AlignedSeriesScanOperatorTest.java    |  15 +-
 .../execution/operator/OperatorMemoryTest.java     |   3 +-
 .../distribution/DistributionPlannerCycleTest.java |   9 +-
 .../queryengine/plan/plan/distribution/Util.java   |   2 +-
 .../queryengine/plan/plan/distribution/Util2.java  |  16 +-
 .../commons/auth/authorizer/BasicAuthorizer.java   |   4 +
 .../commons/auth/entity/PriPrivilegeType.java      |  52 +-
 .../iotdb/commons/auth/role/BasicRoleManager.java  |  92 ++-
 .../iotdb/commons/auth/user/BasicUserManager.java  |  86 +--
 .../udf/builtin/BuiltinAggregationFunction.java    |  20 +-
 .../org/apache/iotdb/commons/utils/AuthUtils.java  |  48 +-
 .../org/apache/iotdb/commons/utils/IOUtils.java    |  28 +-
 ...ReaderTimeseriesMetadataIteratorException.java} |  12 +-
 .../file/metadata/enums/MetadataIndexNodeType.java |   2 +-
 ...leSequenceReaderTimeseriesMetadataIterator.java | 220 +++++++
 ...quenceReaderTimeseriesMetadataIteratorTest.java |  62 ++
 .../thrift-commons/src/main/thrift/common.thrift   |   8 +-
 library-udf/src/assembly/tools/register-UDF.bat    |   1 -
 library-udf/src/assembly/tools/register-UDF.sh     |   1 -
 .../apache/iotdb/library/dprofile/UDAFStddev.java  |  66 --
 143 files changed, 5117 insertions(+), 1711 deletions(-)

diff --cc 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBThriftAsyncConnector.java
index e501d322e53,b51b4284c8c..7b224c996d3
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBThriftAsyncConnector.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBThriftAsyncConnector.java
@@@ -72,34 -57,15 +72,29 @@@ import org.apache.thrift.async.AsyncMet
  import org.slf4j.Logger;
  import org.slf4j.LoggerFactory;
  
- import javax.annotation.Nullable;
- 
  import java.io.FileNotFoundException;
  import java.io.IOException;
 +import java.util.ArrayList;
 +import java.util.Collections;
  import java.util.Comparator;
  import java.util.HashMap;
 +import java.util.List;
 +import java.util.Map;
- import java.util.Optional;
- import java.util.PriorityQueue;
  import java.util.concurrent.PriorityBlockingQueue;
  import java.util.concurrent.atomic.AtomicBoolean;
- import java.util.concurrent.atomic.AtomicLong;
  import java.util.concurrent.atomic.AtomicReference;
  
 +import static 
org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_EXTERNAL_CONFIG_NODES_KEY;
 +import static 
org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_EXTERNAL_USER_NAME_DEFAULT_VALUE;
 +import static 
org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_EXTERNAL_USER_NAME_KEY;
 +import static 
org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_EXTERNAL_USER_PASSWORD_DEFAULT_VALUE;
 +import static 
org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_EXTERNAL_USER_PASSWORD_KEY;
  import static 
org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_BATCH_MODE_ENABLE_KEY;
 +import static 
org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_LOCAL_SPLIT_ENABLE_KEY;
 +import static 
org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_SPLIT_MAX_CONCURRENT_FILE_DEFAULT_VALUE;
 +import static 
org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_SPLIT_MAX_CONCURRENT_FILE_KEY;
 +import static 
org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_SPLIT_MAX_SIZE_DEFAULT_VALUE;
 +import static 
org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_SPLIT_MAX_SIZE_KEY;
  import static 
org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.SINK_IOTDB_BATCH_MODE_ENABLE_KEY;
  import static 
org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.SINK_IOTDB_SSL_ENABLE_KEY;
  
@@@ -116,19 -82,16 +111,20 @@@ public class IoTDBThriftAsyncConnector 
        ASYNC_PIPE_DATA_TRANSFER_CLIENT_MANAGER_HOLDER = new 
AtomicReference<>();
    private final IClientManager<TEndPoint, AsyncPipeDataTransferServiceClient>
        asyncPipeDataTransferClientManager;
 +  private static final AtomicReference<IClientManager<TEndPoint, 
SyncDataNodeInternalServiceClient>>
 +      SYNC_DATA_NODE_CLIENT_MANAGER_HOLDER = new AtomicReference<>();
 +  private final IClientManager<TEndPoint, SyncDataNodeInternalServiceClient>
 +      syncDataNodeClientManager;
  
    private final IoTDBThriftSyncConnector retryConnector = new 
IoTDBThriftSyncConnector();
-   private final PriorityBlockingQueue<Pair<Long, Event>> retryEventQueue =
-       new PriorityBlockingQueue<>(11, Comparator.comparing(o -> o.left));
- 
-   private final AtomicLong commitIdGenerator = new AtomicLong(0);
-   private final AtomicLong lastCommitId = new AtomicLong(0);
-   private final PriorityQueue<Pair<Long, Runnable>> commitQueue =
-       new PriorityQueue<>(Comparator.comparing(o -> o.left));
+   private final PriorityBlockingQueue<Event> retryEventQueue =
+       new PriorityBlockingQueue<>(
+           11,
+           Comparator.comparing(
+               e ->
+                   // Non-enriched events will be put at the front of the 
queue,
+                   // because they are more likely to be lost and need to be 
retried first.
+                   e instanceof EnrichedEvent ? ((EnrichedEvent) 
e).getCommitId() : 0));
  
    private IoTDBThriftAsyncPipeTransferBatchReqBuilder tabletBatchBuilder;
  
@@@ -392,99 -298,12 +386,97 @@@
        throw new 
FileNotFoundException(pipeTsFileInsertionEvent.getTsFile().getAbsolutePath());
      }
  
-     final long requestCommitId = commitIdGenerator.incrementAndGet();
      final PipeTransferTsFileInsertionEventHandler 
pipeTransferTsFileInsertionEventHandler =
-         new PipeTransferTsFileInsertionEventHandler(
-             requestCommitId, pipeTsFileInsertionEvent, this);
+         new PipeTransferTsFileInsertionEventHandler(pipeTsFileInsertionEvent, 
this);
  
-     transfer(requestCommitId, pipeTransferTsFileInsertionEventHandler);
+     transfer(pipeTsFileInsertionEvent.getCommitId(), 
pipeTransferTsFileInsertionEventHandler);
    }
  
 +  @Override
 +  public void transfer(TsFileBatchInsertionEvent tsFileInsertionEvent) throws 
Exception {
 +    transferQueuedEventsIfNecessary();
 +    transferBatchedEventsIfNecessary();
 +
 +    if (!(tsFileInsertionEvent instanceof PipeBatchTsFileInsertionEvent)) {
 +      LOGGER.warn(
 +          "IoTDBThriftAsyncConnector only support 
PipeBatchTsFileInsertionEvent. Current event: {}.",
 +          tsFileInsertionEvent);
 +      return;
 +    }
 +
 +    if (((EnrichedEvent) tsFileInsertionEvent).shouldParsePatternOrTime()) {
 +      try {
 +        for (final TabletInsertionEvent event : 
tsFileInsertionEvent.toTabletInsertionEvents()) {
 +          transfer(event);
 +        }
 +      } finally {
 +        tsFileInsertionEvent.close();
 +      }
 +      return;
 +    }
 +
 +    final PipeBatchTsFileInsertionEvent pipeTsFileInsertionEvent =
 +        (PipeBatchTsFileInsertionEvent) tsFileInsertionEvent;
 +    pipeTsFileInsertionEvent.waitForTsFileClose();
 +
 +    if (!useLocalSplit) {
 +      List<PipeTsFileInsertionEvent> pipeTsFileInsertionEvents =
 +          pipeTsFileInsertionEvent.toSingleFileEvents();
 +      for (PipeTsFileInsertionEvent fileInsertionEvent : 
pipeTsFileInsertionEvents) {
 +        transfer(fileInsertionEvent);
 +      }
 +    } else {
 +      final long requestCommitId = commitIdGenerator.incrementAndGet();
 +      transfer(requestCommitId, pipeTsFileInsertionEvent);
 +    }
 +  }
 +
 +  private void transfer(
 +      long requestCommitId, PipeBatchTsFileInsertionEvent 
pipeTsFileInsertionEvent)
 +      throws IOException {
 +    LoadTsFileNode loadTsFileNode =
 +        new LoadTsFileNode(
 +            new PlanNodeId("Pipe-" + requestCommitId), 
pipeTsFileInsertionEvent.getResources());
 +
 +    IPartitionFetcher partitionFetcher =
 +        new ExternalPartitionFetcher(targetConfigNodes, thriftClientProperty, 
targetSeriesSlotNum);
 +    DataPartitionBatchFetcher dataPartitionBatchFetcher =
 +        new DataPartitionBatchFetcher(partitionFetcher);
 +    TsFileSplitSender splitSender =
 +        new TsFileSplitSender(
 +            loadTsFileNode,
 +            dataPartitionBatchFetcher,
 +            targetPartitionInterval,
 +            syncDataNodeClientManager,
 +            false,
 +            splitMaxSize,
 +            maxConcurrentFileNum,
 +            targetUserName,
 +            targetPassword);
 +    splitSender.start();
 +    LOGGER.info(
 +        "Sending {} files to {} complete",
 +        pipeTsFileInsertionEvent.getTsFiles().size(),
 +        targetConfigNodes);
 +
 +    if (splitSender.getStatistic().isHasP2Timeout()) {
 +      double throughput = splitSender.getStatistic().p2ThroughputMbps();
 +      Map<String, Object> param = new HashMap<>(2);
 +      param.put(
 +          PipeBatchTsFileInsertionEvent.CONNECTOR_TIMEOUT_MS,
 +          splitSender.getStatistic().getP2Timeout());
 +      param.put(PipeBatchTsFileInsertionEvent.CONNECTOR_THROUGHPUT_MBPS_KEY, 
throughput);
 +      pipeTsFileInsertionEvent.getExtractorOnConnectorTimeout().apply(param);
 +    } else {
 +      double throughput = splitSender.getStatistic().p2ThroughputMbps();
 +      pipeTsFileInsertionEvent
 +          .getExtractorOnConnectorSuccess()
 +          .apply(
 +              Collections.singletonMap(
 +                  
PipeBatchTsFileInsertionEvent.CONNECTOR_THROUGHPUT_MBPS_KEY, throughput));
 +    }
 +  }
 +
    private void transfer(
        long requestCommitId,
        PipeTransferTsFileInsertionEventHandler 
pipeTransferTsFileInsertionEventHandler) {
diff --cc 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/historical/PipeHistoricalDataRegionTsFileExtractor.java
index f561d38568e,522622bca6c..ef45119441a
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/historical/PipeHistoricalDataRegionTsFileExtractor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/historical/PipeHistoricalDataRegionTsFileExtractor.java
@@@ -72,18 -72,19 +72,19 @@@ public class PipeHistoricalDataRegionTs
    private static final Map<Integer, Long> 
DATA_REGION_ID_TO_PIPE_FLUSHED_TIME_MAP = new HashMap<>();
    private static final long PIPE_MIN_FLUSH_INTERVAL_IN_MS = 2000;
  
 -  private String pipeName;
 -  private PipeTaskMeta pipeTaskMeta;
 -  private ProgressIndex startIndex;
++  protected String pipeName;
 +  protected PipeTaskMeta pipeTaskMeta;
 +  protected ProgressIndex startIndex;
  
 -  private int dataRegionId;
 +  protected int dataRegionId;
  
 -  private String pattern;
 +  protected String pattern;
    private boolean isDbNameCoveredByPattern = false;
  
 -  private long historicalDataExtractionStartTime; // Event time
 -  private long historicalDataExtractionEndTime; // Event time
 +  protected long historicalDataExtractionStartTime; // Event time
 +  protected long historicalDataExtractionEndTime; // Event time
  
 -  private long historicalDataExtractionTimeLowerBound; // Arrival time
 +  protected long historicalDataExtractionTimeLowerBound; // Arrival time
  
    private boolean sloppyTimeRange; // true to disable time range filter after 
extraction
  

Reply via email to