This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch load_v2
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/load_v2 by this push:
new ec0d8aa02e4 add throughput monitor fix schema registration add configs
and timers
new 270f6f6efce Merge branch 'load_v2' of github.com:apache/iotdb into
load_v2
ec0d8aa02e4 is described below
commit ec0d8aa02e480dc8ff14072d75348c30a915a2ed
Author: Tian Jiang <[email protected]>
AuthorDate: Tue Nov 7 14:09:23 2023 +0800
add throughput monitor
fix schema registration
add configs and timers
---
.../iotdb/confignode/conf/ConfigNodeConfig.java | 9 +
.../confignode/conf/ConfigNodeDescriptor.java | 8 +
.../manager/partition/PartitionManager.java | 62 +++++--
.../thrift/ConfigNodeRPCServiceProcessor.java | 12 +-
.../config/constant/PipeConnectorConstant.java | 2 +
.../config/constant/PipeExtractorConstant.java | 9 +
.../thrift/async/IoTDBThriftAsyncConnector.java | 33 +++-
.../tsfile/PipeBatchTsFileInsertionEvent.java | 40 +++-
.../pipe/extractor/IoTDBDataRegionExtractor.java | 19 +-
.../historical/BatchedTsFileExtractor.java | 48 ++++-
.../extractor/historical/ThroughputMonitor.java | 125 +++++++++++++
.../subtask/connector/PipeConnectorSubtask.java | 4 +
.../iotdb/db/protocol/client/ConfigNodeClient.java | 6 +
.../db/protocol/session/ExternalClientSession.java | 92 ++++++++++
.../iotdb/db/protocol/session/SessionManager.java | 8 +-
.../impl/DataNodeInternalRPCServiceImpl.java | 163 +++++++++++++++--
.../iotdb/db/queryengine/common/QueryId.java | 6 +-
.../execution/load/LoadTsFileManager.java | 53 ++++--
.../execution/load/TsFileSplitSender.java | 203 +++++++++++----------
.../analyze/cache/partition/PartitionCache.java | 23 ++-
.../analyze/partition/BasicPartitionFetcher.java | 4 +-
.../analyze/partition/ClusterPartitionFetcher.java | 3 +-
.../partition/ExternalPartitionFetcher.java | 5 +-
.../plan/analyze/schema/SchemaValidator.java | 23 +++
.../planner/plan/node/load/LoadTsFileNode.java | 8 +
.../execution/load/LoadTsFileManagerTest.java | 7 +-
.../execution/load/TsFileSplitSenderTest.java | 17 +-
.../iotdb/commons/pipe/task/meta/PipeTaskMeta.java | 12 ++
.../org/apache/iotdb/tsfile/utils/TsFileUtils.java | 9 +-
.../src/main/thrift/datanode.thrift | 6 +-
30 files changed, 834 insertions(+), 185 deletions(-)
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
index 277a3513f3c..254d089696f 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
@@ -301,6 +301,7 @@ public class ConfigNodeConfig {
private boolean isEnablePrintingNewlyCreatedPartition = false;
private long forceWalPeriodForConfigNodeSimpleInMs = 100;
+ private boolean isEnableAutoCreateDatabase = true;
public ConfigNodeConfig() {
// empty constructor
@@ -1232,4 +1233,12 @@ public class ConfigNodeConfig {
long dataRegionRatisPeriodicSnapshotInterval) {
this.dataRegionRatisPeriodicSnapshotInterval =
dataRegionRatisPeriodicSnapshotInterval;
}
+
+ public boolean isEnableAutoCreateDatabase() {
+ return isEnableAutoCreateDatabase;
+ }
+
+ public void setEnableAutoCreateDatabase(boolean enableAutoCreateDatabase) {
+ isEnableAutoCreateDatabase = enableAutoCreateDatabase;
+ }
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
index 99cc0e6376d..814f8fcd084 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
@@ -429,6 +429,14 @@ public class ConfigNodeDescriptor {
String.valueOf(conf.getProcedureCoreWorkerThreadsCount()))
.trim()));
+ conf.setEnableAutoCreateDatabase(
+ Boolean.parseBoolean(
+ properties
+ .getProperty(
+ "procedure_core_worker_thread_count",
+ String.valueOf(conf.isEnableAutoCreateDatabase()))
+ .trim()));
+
loadRatisConsensusConfig(properties);
loadCQConfig(properties);
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java
index f49fa60f00e..3d57339de64 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java
@@ -43,6 +43,7 @@ import
org.apache.iotdb.confignode.client.async.handlers.AsyncClientHandler;
import org.apache.iotdb.confignode.conf.ConfigNodeConfig;
import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlan;
+import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlanType;
import
org.apache.iotdb.confignode.consensus.request.read.partition.CountTimeSlotListPlan;
import
org.apache.iotdb.confignode.consensus.request.read.partition.GetDataPartitionPlan;
import
org.apache.iotdb.confignode.consensus.request.read.partition.GetNodePathsPartitionPlan;
@@ -53,6 +54,7 @@ import
org.apache.iotdb.confignode.consensus.request.read.partition.GetSeriesSlo
import
org.apache.iotdb.confignode.consensus.request.read.partition.GetTimeSlotListPlan;
import
org.apache.iotdb.confignode.consensus.request.read.region.GetRegionIdPlan;
import
org.apache.iotdb.confignode.consensus.request.read.region.GetRegionInfoListPlan;
+import
org.apache.iotdb.confignode.consensus.request.write.database.DatabaseSchemaPlan;
import
org.apache.iotdb.confignode.consensus.request.write.database.PreDeleteDatabasePlan;
import
org.apache.iotdb.confignode.consensus.request.write.partition.CreateDataPartitionPlan;
import
org.apache.iotdb.confignode.consensus.request.write.partition.CreateSchemaPartitionPlan;
@@ -81,10 +83,12 @@ import
org.apache.iotdb.confignode.persistence.partition.maintainer.RegionDelete
import
org.apache.iotdb.confignode.persistence.partition.maintainer.RegionMaintainTask;
import
org.apache.iotdb.confignode.persistence.partition.maintainer.RegionMaintainType;
import org.apache.iotdb.confignode.rpc.thrift.TCountTimeSlotListReq;
+import org.apache.iotdb.confignode.rpc.thrift.TDatabaseSchema;
import org.apache.iotdb.confignode.rpc.thrift.TGetRegionIdReq;
import org.apache.iotdb.confignode.rpc.thrift.TGetSeriesSlotListReq;
import org.apache.iotdb.confignode.rpc.thrift.TGetTimeSlotListReq;
import org.apache.iotdb.confignode.rpc.thrift.TTimeSlotList;
+import
org.apache.iotdb.confignode.service.thrift.ConfigNodeRPCServiceProcessor;
import org.apache.iotdb.consensus.exception.ConsensusException;
import org.apache.iotdb.mpp.rpc.thrift.TCreateDataRegionReq;
import org.apache.iotdb.mpp.rpc.thrift.TCreateSchemaRegionReq;
@@ -212,14 +216,27 @@ public class PartitionManager {
// Check if the related Databases exist
for (String database : req.getPartitionSlotsMap().keySet()) {
if (!isDatabaseExist(database)) {
- return new SchemaPartitionResp(
- new TSStatus(TSStatusCode.DATABASE_NOT_EXIST.getStatusCode())
- .setMessage(
- String.format(
- "Create SchemaPartition failed because the database:
%s is not exists",
- database)),
- false,
- null);
+ if (!CONF.isEnableAutoCreateDatabase()) {
+ return new SchemaPartitionResp(
+ new TSStatus(TSStatusCode.DATABASE_NOT_EXIST.getStatusCode())
+ .setMessage(
+ String.format(
+ "Create SchemaPartition failed because the database:
%s is not exists",
+ database)),
+ false,
+ null);
+ } else {
+ TDatabaseSchema databaseSchema = new TDatabaseSchema(database);
+ ConfigNodeRPCServiceProcessor.validateDatabaseSchema(databaseSchema);
+ DatabaseSchemaPlan databaseSchemaPlan =
+ new DatabaseSchemaPlan(ConfigPhysicalPlanType.CreateDatabase,
databaseSchema);
+ TSStatus tsStatus =
consensusWritePartitionResult(databaseSchemaPlan);
+ LOGGER.info("Auto create database {}: {}", database, tsStatus);
+ if (tsStatus.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()
+ && !isDatabaseExist(database)) {
+ return new SchemaPartitionResp(tsStatus, false, null);
+ }
+ }
}
}
@@ -335,14 +352,27 @@ public class PartitionManager {
// Check if the related Databases exist
for (String database : req.getPartitionSlotsMap().keySet()) {
if (!isDatabaseExist(database)) {
- return new DataPartitionResp(
- new TSStatus(TSStatusCode.DATABASE_NOT_EXIST.getStatusCode())
- .setMessage(
- String.format(
- "Create DataPartition failed because the database: %s
is not exists",
- database)),
- false,
- null);
+ if (!CONF.isEnableAutoCreateDatabase()) {
+ return new DataPartitionResp(
+ new TSStatus(TSStatusCode.DATABASE_NOT_EXIST.getStatusCode())
+ .setMessage(
+ String.format(
+ "Create DataPartition failed because the database:
%s is not exists",
+ database)),
+ false,
+ null);
+ } else {
+ TDatabaseSchema databaseSchema = new TDatabaseSchema(database);
+ ConfigNodeRPCServiceProcessor.validateDatabaseSchema(databaseSchema);
+ DatabaseSchemaPlan databaseSchemaPlan =
+ new DatabaseSchemaPlan(ConfigPhysicalPlanType.CreateDatabase,
databaseSchema);
+ TSStatus tsStatus =
consensusWritePartitionResult(databaseSchemaPlan);
+ LOGGER.info("Auto create database {}: {}", database, tsStatus);
+ if (tsStatus.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()
+ && !isDatabaseExist(database)) {
+ return new DataPartitionResp(tsStatus, false, null);
+ }
+ }
}
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
index 8103cffd2a4..843065269d8 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
@@ -285,8 +285,7 @@ public class ConfigNodeRPCServiceProcessor implements
IConfigNodeRPCService.Ifac
return configManager.showVariables();
}
- @Override
- public TSStatus setDatabase(TDatabaseSchema databaseSchema) {
+ public static TSStatus validateDatabaseSchema(TDatabaseSchema
databaseSchema) {
TSStatus errorResp = null;
boolean isSystemDatabase =
databaseSchema.getName().equals(SchemaConstant.SYSTEM_DATABASE);
@@ -362,6 +361,15 @@ public class ConfigNodeRPCServiceProcessor implements
IConfigNodeRPCService.Ifac
// The maxRegionGroupNum is equal to the minRegionGroupNum when initialize
databaseSchema.setMaxSchemaRegionGroupNum(databaseSchema.getMinSchemaRegionGroupNum());
databaseSchema.setMaxDataRegionGroupNum(databaseSchema.getMinDataRegionGroupNum());
+ return errorResp;
+ }
+
+ @Override
+ public TSStatus setDatabase(TDatabaseSchema databaseSchema) {
+ TSStatus errorResp = validateDatabaseSchema(databaseSchema);
+ if (errorResp != null) {
+ return errorResp;
+ }
DatabaseSchemaPlan setPlan =
new DatabaseSchemaPlan(ConfigPhysicalPlanType.CreateDatabase,
databaseSchema);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/config/constant/PipeConnectorConstant.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/config/constant/PipeConnectorConstant.java
index 0d823abe0e7..200778afc9b 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/config/constant/PipeConnectorConstant.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/config/constant/PipeConnectorConstant.java
@@ -107,6 +107,8 @@ public class PipeConnectorConstant {
public static final int CONNECTOR_SPLIT_MAX_CONCURRENT_FILE_DEFAULT_VALUE =
16;
public static final String CONNECTOR_EXTERNAL_USER_NAME_KEY =
"connector.external.user-name";
public static final String CONNECTOR_EXTERNAL_USER_NAME_DEFAULT_VALUE =
"root";
+ public static final String CONNECTOR_EXTERNAL_USER_PASSWORD_KEY =
"connector.external.password";
+ public static final String CONNECTOR_EXTERNAL_USER_PASSWORD_DEFAULT_VALUE =
"root";
private PipeConnectorConstant() {
throw new IllegalStateException("Utility class");
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/config/constant/PipeExtractorConstant.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/config/constant/PipeExtractorConstant.java
index a1eead9e6f5..19d62b569b5 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/config/constant/PipeExtractorConstant.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/config/constant/PipeExtractorConstant.java
@@ -19,6 +19,8 @@
package org.apache.iotdb.db.pipe.config.constant;
+import static org.apache.iotdb.commons.conf.IoTDBConstant.GB;
+
public class PipeExtractorConstant {
public static final String EXTRACTOR_KEY = "extractor";
@@ -53,6 +55,13 @@ public class PipeExtractorConstant {
public static final String EXTRACTOR_REALTIME_MODE_FILE_VALUE = "file";
public static final String EXTRACTOR_REALTIME_MODE_LOG_VALUE = "log";
public static final String EXTRACTOR_REALTIME_MODE_FORCED_LOG_VALUE =
"forced-log";
+ public static final String EXTRACTOR_LOCAL_SPLIT_ENABLE_KEY =
"extractor.local-split.enable";
+ public static final String EXTRACTOR_SPLIT_MAX_CONCURRENT_FILE_KEY =
+ "extractor.split.max-concurrent-file";
+ public static final int EXTRACTOR_SPLIT_MAX_CONCURRENT_FILE_DEFAULT_VALUE =
16;
+ public static final String EXTRACTOR_SPLIT_MAX_FILE_BATCH_SIZE_KEY =
+ "extractor.split.max-file-batch-size";
+ public static final long EXTRACTOR_SPLIT_MAX_FILE_BATCH_SIZE_DEFAULT_VALUE =
4 * GB;
private PipeExtractorConstant() {
throw new IllegalStateException("Utility class");
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBThriftAsyncConnector.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBThriftAsyncConnector.java
index 88e1f4d2b92..09c387b51be 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBThriftAsyncConnector.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBThriftAsyncConnector.java
@@ -76,8 +76,11 @@ import javax.annotation.Nullable;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.Comparator;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.Optional;
import java.util.PriorityQueue;
import java.util.concurrent.PriorityBlockingQueue;
@@ -88,6 +91,8 @@ import java.util.concurrent.atomic.AtomicReference;
import static
org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_EXTERNAL_CONFIG_NODES_KEY;
import static
org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_EXTERNAL_USER_NAME_DEFAULT_VALUE;
import static
org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_EXTERNAL_USER_NAME_KEY;
+import static
org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_EXTERNAL_USER_PASSWORD_DEFAULT_VALUE;
+import static
org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_EXTERNAL_USER_PASSWORD_KEY;
import static
org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_LOCAL_SPLIT_ENABLE_KEY;
import static
org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_SPLIT_MAX_CONCURRENT_FILE_DEFAULT_VALUE;
import static
org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_SPLIT_MAX_CONCURRENT_FILE_KEY;
@@ -134,6 +139,7 @@ public class IoTDBThriftAsyncConnector extends
IoTDBConnector {
private int splitMaxSize;
private int maxConcurrentFileNum;
private String targetUserName;
+ private String targetPassword;
public IoTDBThriftAsyncConnector() {
if (ASYNC_PIPE_DATA_TRANSFER_CLIENT_MANAGER_HOLDER.get() == null) {
@@ -189,6 +195,9 @@ public class IoTDBThriftAsyncConnector extends
IoTDBConnector {
targetUserName =
parameters.getStringOrDefault(
CONNECTOR_EXTERNAL_USER_NAME_KEY,
CONNECTOR_EXTERNAL_USER_NAME_DEFAULT_VALUE);
+ targetPassword =
+ parameters.getStringOrDefault(
+ CONNECTOR_EXTERNAL_USER_PASSWORD_KEY,
CONNECTOR_EXTERNAL_USER_PASSWORD_DEFAULT_VALUE);
}
if (isTabletBatchModeEnabled) {
@@ -452,13 +461,29 @@ public class IoTDBThriftAsyncConnector extends
IoTDBConnector {
false,
splitMaxSize,
maxConcurrentFileNum,
- targetUserName);
+ targetUserName,
+ targetPassword);
splitSender.start();
LOGGER.info(
- "Sending {} files to {} complete: {}",
+ "Sending {} files to {} complete",
pipeTsFileInsertionEvent.getTsFiles().size(),
- targetConfigNodes,
- splitSender.getStatistic());
+ targetConfigNodes);
+
+ if (splitSender.getStatistic().hasP2Timeout) {
+ double throughput = splitSender.getStatistic().p2ThroughputMBPS();
+ Map<String, Object> param = new HashMap<>(2);
+ param.put(
+ PipeBatchTsFileInsertionEvent.CONNECTOR_TIMEOUT_MS,
splitSender.getStatistic().p2Timeout);
+ param.put(PipeBatchTsFileInsertionEvent.CONNECTOR_THROUGHPUT_MBPS_KEY,
throughput);
+ pipeTsFileInsertionEvent.getExtractorOnConnectorTimeout().apply(param);
+ } else {
+ double throughput = splitSender.getStatistic().p2ThroughputMBPS();
+ pipeTsFileInsertionEvent
+ .getExtractorOnConnectorSuccess()
+ .apply(
+ Collections.singletonMap(
+ PipeBatchTsFileInsertionEvent.CONNECTOR_THROUGHPUT_MBPS_KEY,
throughput));
+ }
}
private void transfer(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeBatchTsFileInsertionEvent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeBatchTsFileInsertionEvent.java
index 72d8b80a292..6c5b222ca42 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeBatchTsFileInsertionEvent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeBatchTsFileInsertionEvent.java
@@ -37,7 +37,9 @@ import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Function;
import java.util.stream.Collectors;
public class PipeBatchTsFileInsertionEvent extends EnrichedEvent
@@ -59,6 +61,17 @@ public class PipeBatchTsFileInsertionEvent extends
EnrichedEvent
private final AtomicBoolean[] isClosed;
private TsFileListInsertionDataContainer dataContainer;
+ private long totalSize;
+
+ /**
+ * If the event times out when being transferred, the connector uses this
call back to inform the
+ * extractor that the task size should be reduced or to start throttling.
+ */
+ private Function<Map<String, Object>, Void> extractorOnConnectorTimeout;
+
+ private Function<Map<String, Object>, Void> extractorOnConnectorSuccess;
+ public static final String CONNECTOR_THROUGHPUT_MBPS_KEY =
"connector.throughput_MBPS";
+ public static final String CONNECTOR_TIMEOUT_MS = "connector.timeout_MS";
public PipeBatchTsFileInsertionEvent(
List<TsFileResource> resources, boolean isLoaded, boolean
isGeneratedByPipe) {
@@ -86,6 +99,9 @@ public class PipeBatchTsFileInsertionEvent extends
EnrichedEvent
this.resources = resources;
tsFiles =
resources.stream().map(TsFileResource::getTsFile).collect(Collectors.toList());
+ for (TsFileResource resource : resources) {
+ totalSize += resource.getTsFileSize();
+ }
isClosed = new AtomicBoolean[resources.size()];
this.isLoaded = isLoaded;
@@ -242,13 +258,13 @@ public class PipeBatchTsFileInsertionEvent extends
EnrichedEvent
@Override
public String toString() {
- return "PipeTsFileInsertionEvent{"
+ return "PipeBatchTsFileInsertionEvent{"
+ "resources="
+ resources
+ ", tsFiles="
+ tsFiles
- + ", isClosed="
- + isClosed
+ + ", totalSize="
+ + totalSize
+ '}';
}
@@ -272,4 +288,22 @@ public class PipeBatchTsFileInsertionEvent extends
EnrichedEvent
protected boolean isTsFileResourceCoveredByTimeRange(TsFileResource
resource) {
return startTime <= resource.getFileStartTime() && endTime >=
resource.getFileEndTime();
}
+
+ public Function<Map<String, Object>, Void> getExtractorOnConnectorTimeout() {
+ return extractorOnConnectorTimeout;
+ }
+
+ public void setExtractorOnConnectorTimeout(
+ Function<Map<String, Object>, Void> extractorOnConnectorTimeout) {
+ this.extractorOnConnectorTimeout = extractorOnConnectorTimeout;
+ }
+
+ public Function<Map<String, Object>, Void> getExtractorOnConnectorSuccess() {
+ return extractorOnConnectorSuccess;
+ }
+
+ public void setExtractorOnConnectorSuccess(
+ Function<Map<String, Object>, Void> extractorOnConnectorSuccess) {
+ this.extractorOnConnectorSuccess = extractorOnConnectorSuccess;
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/IoTDBDataRegionExtractor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/IoTDBDataRegionExtractor.java
index 927858421d5..94ada46cd08 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/IoTDBDataRegionExtractor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/IoTDBDataRegionExtractor.java
@@ -49,11 +49,9 @@ import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
-import static
org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_LOCAL_SPLIT_ENABLE_KEY;
-import static
org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_SPLIT_MAX_CONCURRENT_FILE_DEFAULT_VALUE;
-import static
org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_SPLIT_MAX_CONCURRENT_FILE_KEY;
import static
org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_HISTORY_ENABLE_DEFAULT_VALUE;
import static
org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_HISTORY_ENABLE_KEY;
+import static
org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_LOCAL_SPLIT_ENABLE_KEY;
import static
org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_REALTIME_ENABLE_DEFAULT_VALUE;
import static
org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_REALTIME_ENABLE_KEY;
import static
org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_REALTIME_MODE_FILE_VALUE;
@@ -61,6 +59,10 @@ import static
org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.EXT
import static
org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_REALTIME_MODE_HYBRID_VALUE;
import static
org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_REALTIME_MODE_KEY;
import static
org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_REALTIME_MODE_LOG_VALUE;
+import static
org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_SPLIT_MAX_CONCURRENT_FILE_DEFAULT_VALUE;
+import static
org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_SPLIT_MAX_CONCURRENT_FILE_KEY;
+import static
org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_SPLIT_MAX_FILE_BATCH_SIZE_DEFAULT_VALUE;
+import static
org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_SPLIT_MAX_FILE_BATCH_SIZE_KEY;
import static
org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.SOURCE_HISTORY_ENABLE_KEY;
import static
org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.SOURCE_REALTIME_ENABLE_KEY;
import static
org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.SOURCE_REALTIME_MODE_KEY;
@@ -133,13 +135,18 @@ public class IoTDBDataRegionExtractor implements
PipeExtractor {
private void constructHistoricalExtractor(PipeParameters parameters) {
// Enable historical extractor by default
- if (parameters.getBooleanOrDefault(CONNECTOR_LOCAL_SPLIT_ENABLE_KEY,
false)) {
+ if (parameters.getBooleanOrDefault(EXTRACTOR_LOCAL_SPLIT_ENABLE_KEY,
false)) {
+ LOGGER.info("Use batched extractor.");
historicalExtractor =
new BatchedTsFileExtractor(
parameters.getIntOrDefault(
- CONNECTOR_SPLIT_MAX_CONCURRENT_FILE_KEY,
- CONNECTOR_SPLIT_MAX_CONCURRENT_FILE_DEFAULT_VALUE));
+ EXTRACTOR_SPLIT_MAX_CONCURRENT_FILE_KEY,
+ EXTRACTOR_SPLIT_MAX_CONCURRENT_FILE_DEFAULT_VALUE),
+ parameters.getLongOrDefault(
+ EXTRACTOR_SPLIT_MAX_FILE_BATCH_SIZE_KEY,
+ EXTRACTOR_SPLIT_MAX_FILE_BATCH_SIZE_DEFAULT_VALUE));
} else {
+ LOGGER.info("Use single extractor.");
historicalExtractor = new PipeHistoricalDataRegionTsFileExtractor();
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/historical/BatchedTsFileExtractor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/historical/BatchedTsFileExtractor.java
index 3a5deeda141..4ed5e894b53 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/historical/BatchedTsFileExtractor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/historical/BatchedTsFileExtractor.java
@@ -29,6 +29,9 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
+import java.util.Map;
+
+import static org.apache.iotdb.commons.conf.IoTDBConstant.MB;
/**
* Similar to the base class, but it batches several files as an event to
enable further
@@ -38,9 +41,13 @@ public class BatchedTsFileExtractor extends
PipeHistoricalDataRegionTsFileExtrac
private static final Logger LOGGER =
LoggerFactory.getLogger(BatchedTsFileExtractor.class);
private int maxBatchSize;
+ private long maxFileTotalSize;
+ private ThroughputMonitor throughputMonitor;
- public BatchedTsFileExtractor(int maxBatchSize) {
+ public BatchedTsFileExtractor(int maxBatchSize, long maxFileTotalSize) {
this.maxBatchSize = Math.max(1, maxBatchSize);
+ this.maxFileTotalSize = maxFileTotalSize;
+ this.throughputMonitor = new ThroughputMonitor();
}
@Override
@@ -52,10 +59,16 @@ public class BatchedTsFileExtractor extends
PipeHistoricalDataRegionTsFileExtrac
if (resource == null) {
return null;
}
+ long totalFileSize = 0;
List<TsFileResource> tsFileResourceList = new ArrayList<>(maxBatchSize);
tsFileResourceList.add(resource);
- while (!pendingQueue.isEmpty() && tsFileResourceList.size() <
maxBatchSize) {
- tsFileResourceList.add(resource);
+ totalFileSize = resource.getTsFileSize();
+ while (!pendingQueue.isEmpty()
+ && tsFileResourceList.size() < maxBatchSize
+ && totalFileSize < maxFileTotalSize) {
+ TsFileResource poll = pendingQueue.poll();
+ tsFileResourceList.add(poll);
+ totalFileSize += poll.getTsFileSize();
}
final PipeBatchTsFileInsertionEvent event =
@@ -83,6 +96,35 @@ public class BatchedTsFileExtractor extends
PipeHistoricalDataRegionTsFileExtrac
}
}
+ event.setExtractorOnConnectorTimeout(this::onConnectorTimeout);
+ event.setExtractorOnConnectorSuccess(this::onConnectorSuccess);
+ LOGGER.info("Generated a TsFile event: {}", event);
return event;
}
+
+ public Void onConnectorTimeout(Map<String, Object> parameters) {
+ long timeout = (long)
parameters.get(PipeBatchTsFileInsertionEvent.CONNECTOR_TIMEOUT_MS);
+ double throughput =
+ (double)
parameters.get(PipeBatchTsFileInsertionEvent.CONNECTOR_THROUGHPUT_MBPS_KEY);
+ throughputMonitor.record(System.currentTimeMillis(), throughput);
+ double avg = throughputMonitor.calculateAverage();
+ LOGGER.info(
+ "Connector timed out: throughput={}MB/s, timeout={}ms,
estimatedThroughput={}MB/s",
+ throughput,
+ timeout,
+ avg);
+ if (!Double.isNaN(avg)) {
+ maxFileTotalSize = (long) (timeout / 1000.0 * avg * 0.9 * MB);
+ LOGGER.info("Connector timed out: newMaxFileSize={}", maxFileTotalSize);
+ }
+ return null;
+ }
+
+ public Void onConnectorSuccess(Map<String, Object> parameters) {
+ double throughput =
+ (double)
parameters.get(PipeBatchTsFileInsertionEvent.CONNECTOR_THROUGHPUT_MBPS_KEY);
+ LOGGER.info("Connector succeeds with throughput: {}MB/s", throughput);
+ throughputMonitor.record(System.currentTimeMillis(), throughput);
+ return null;
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/historical/ThroughputMonitor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/historical/ThroughputMonitor.java
new file mode 100644
index 00000000000..792e0c467a3
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/historical/ThroughputMonitor.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.pipe.extractor.historical;
+
+import org.apache.iotdb.tsfile.utils.Pair;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Queue;
+
+public class ThroughputMonitor {
+
+ private static final Logger logger =
LoggerFactory.getLogger(ThroughputMonitor.class);
+ private int maxRecordNum = 10000;
+ private int minRecordNumToAnalyze = 5;
+ /** <timestamp, throughput> */
+ private final Queue<Pair<Long, Double>> history = new
ArrayDeque<>(maxRecordNum);
+
+ public void record(long timestamp, double throughput) {
+ synchronized (history) {
+ if (history.size() >= maxRecordNum) {
+ history.remove();
+ }
+ history.add(new Pair<>(timestamp, throughput));
+ }
+ }
+
+ public double calculateAverage() {
+ if (history.size() < minRecordNumToAnalyze) {
+ logger.info("Only {} histories, do not update", history.size());
+ return Double.NaN;
+ }
+ List<Pair<Long, Double>> historyCopy;
+ synchronized (history) {
+ historyCopy = new ArrayList<>(history);
+ }
+ logger.info("{} histories before purging", historyCopy.size());
+ historyCopy = purge(historyCopy);
+ logger.info("{} histories after purging", historyCopy.size());
+ if (historyCopy.size() < minRecordNumToAnalyze) {
+ return Double.NaN;
+ }
+ return historyCopy.stream().mapToDouble(p ->
p.right).average().orElse(Double.NaN);
+ }
+
+ private List<Pair<Long, Double>> purge(List<Pair<Long, Double>> history) {
+ history = purgeByTimeSpan(history);
+ if (history.size() < minRecordNumToAnalyze) {
+ return history;
+ }
+ return purgeByStderr(history);
+ }
+
+ private List<Pair<Long, Double>> purgeByStderr(List<Pair<Long, Double>>
history) {
+ double sum = 0.0;
+ for (Pair<Long, Double> longDoublePair : history) {
+ sum += longDoublePair.right;
+ }
+
+ while (true) {
+ double average = sum / history.size();
+ double stdErrSum = 0.0;
+ int largestStdErrIdx = 0;
+ double largestStdErr = 0.0;
+ for (int i = 0; i < history.size(); i++) {
+ Pair<Long, Double> longDoublePair = history.get(i);
+ Double throughput = longDoublePair.right;
+ double stdErr = Math.sqrt((throughput - average) * (throughput -
average));
+ stdErrSum += stdErr;
+ if (stdErr > largestStdErr) {
+ largestStdErrIdx = i;
+ largestStdErr = stdErr;
+ }
+ }
+ double stdErrAvg = stdErrSum / history.size();
+
+ if (largestStdErr > stdErrAvg * 3.0) {
+ history.remove(largestStdErrIdx);
+ if (history.size() < minRecordNumToAnalyze) {
+ return history;
+ }
+ } else {
+ break;
+ }
+ }
+
+ return history;
+ }
+
+ private List<Pair<Long, Double>> purgeByTimeSpan(List<Pair<Long, Double>>
history) {
+ long maxTimeSpan = 3600 * 1000L;
+ long currTime = System.currentTimeMillis();
+ long timeThreshold = currTime - maxTimeSpan;
+ int i = 0;
+ for (; i < history.size(); i++) {
+ if (history.get(i).left >= timeThreshold) {
+ break;
+ }
+ }
+ if (i > 0) {
+ history = history.subList(i, history.size());
+ }
+ return history;
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtask.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtask.java
index 05119db0ca2..3b2bcb4cbcf 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtask.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtask.java
@@ -33,6 +33,7 @@ import org.apache.iotdb.db.utils.ErrorHandlingUtils;
import org.apache.iotdb.pipe.api.PipeConnector;
import org.apache.iotdb.pipe.api.event.Event;
import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
+import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileBatchInsertionEvent;
import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
import org.apache.iotdb.pipe.api.exception.PipeConnectionException;
import org.apache.iotdb.pipe.api.exception.PipeException;
@@ -122,6 +123,9 @@ public class PipeConnectorSubtask extends PipeSubtask {
} else if (event instanceof TsFileInsertionEvent) {
outputPipeConnector.transfer((TsFileInsertionEvent) event);
PipeConnectorMetrics.getInstance().markTsFileEvent(taskID);
+ } else if (event instanceof TsFileBatchInsertionEvent) {
+ outputPipeConnector.transfer((TsFileBatchInsertionEvent) event);
+ PipeConnectorMetrics.getInstance().markTsFileEvent(taskID);
} else if (event instanceof PipeHeartbeatEvent) {
try {
outputPipeConnector.heartbeat();
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/ConfigNodeClient.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/ConfigNodeClient.java
index 29fd5c4a8a7..e52f8ff0a63 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/ConfigNodeClient.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/ConfigNodeClient.java
@@ -29,6 +29,7 @@ import org.apache.iotdb.common.rpc.thrift.TSetTTLReq;
import org.apache.iotdb.common.rpc.thrift.TSetThrottleQuotaReq;
import org.apache.iotdb.commons.client.ClientManager;
import org.apache.iotdb.commons.client.ThriftClient;
+import org.apache.iotdb.commons.client.exception.ClientManagerException;
import org.apache.iotdb.commons.client.factory.ThriftClientFactory;
import org.apache.iotdb.commons.client.property.ThriftClientProperty;
import org.apache.iotdb.commons.client.sync.SyncThriftClientWithErrorHandler;
@@ -1089,4 +1090,9 @@ public class ConfigNodeClient implements
IConfigNodeRPCService.Iface, ThriftClie
.orElse(false);
}
}
+
+ @FunctionalInterface
+ public interface ConfigNodeClientProvider {
+ ConfigNodeClient supply() throws ClientManagerException, TException;
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/session/ExternalClientSession.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/session/ExternalClientSession.java
new file mode 100644
index 00000000000..701945ecf8e
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/session/ExternalClientSession.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.protocol.session;
+
+import org.apache.iotdb.service.rpc.thrift.TSConnectionType;
+
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArraySet;
+
+/** Client session from another IoTDB cluster, such as the other end of a
pipe. */
+public class ExternalClientSession extends IClientSession {
+
+ // Pipe task UUId
+ private final String clientID;
+
+ private final Map<Long, Set<Long>> statementIdToQueryId = new
ConcurrentHashMap<>();
+
+ public ExternalClientSession(String clientID) {
+ this.clientID = clientID;
+ }
+
+ @Override
+ public String getClientAddress() {
+ return clientID;
+ }
+
+ @Override
+ public int getClientPort() {
+ return 0;
+ }
+
+ @Override
+ TSConnectionType getConnectionType() {
+ return TSConnectionType.INTERNAL;
+ }
+
+ @Override
+ String getConnectionId() {
+ return clientID;
+ }
+
+ @Override
+ public Set<Long> getStatementIds() {
+ return statementIdToQueryId.keySet();
+ }
+
+ @Override
+ public void addStatementId(long statementId) {
+ statementIdToQueryId.computeIfAbsent(statementId, sid -> new
CopyOnWriteArraySet<>());
+ }
+
+ @Override
+ public Set<Long> removeStatementId(long statementId) {
+ return statementIdToQueryId.remove(statementId);
+ }
+
+ @Override
+ public void addQueryId(Long statementId, long queryId) {
+ Set<Long> queryIds = statementIdToQueryId.get(statementId);
+ if (queryIds == null) {
+ throw new IllegalStateException(
+ "StatementId: " + statementId + "doesn't exist in this session " +
this);
+ }
+ queryIds.add(queryId);
+ }
+
+ @Override
+ public void removeQueryId(Long statementId, Long queryId) {
+ Set<Long> queryIds = statementIdToQueryId.get(statementId);
+ if (queryIds != null) {
+ queryIds.remove(queryId);
+ }
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/session/SessionManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/session/SessionManager.java
index eafec00cd8d..8e1a9485d8f 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/session/SessionManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/session/SessionManager.java
@@ -308,9 +308,11 @@ public class SessionManager implements SessionManagerMBean
{
*/
public void removeCurrSession() {
IClientSession session = currSession.get();
- sessions.remove(session);
- currSession.remove();
- currSessionIdleTime.remove();
+ if (session != null) {
+ sessions.remove(session);
+ currSession.remove();
+ currSessionIdleTime.remove();
+ }
}
/**
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
index a2a6289f74f..68b21458c26 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
@@ -24,11 +24,14 @@ import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.common.rpc.thrift.TFlushReq;
+import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.common.rpc.thrift.TSetSpaceQuotaReq;
import org.apache.iotdb.common.rpc.thrift.TSetTTLReq;
import org.apache.iotdb.common.rpc.thrift.TSetThrottleQuotaReq;
import org.apache.iotdb.common.rpc.thrift.TSettleReq;
+import org.apache.iotdb.commons.client.exception.ClientManagerException;
+import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient;
import org.apache.iotdb.commons.cluster.NodeStatus;
import org.apache.iotdb.commons.conf.CommonConfig;
import org.apache.iotdb.commons.conf.CommonDescriptor;
@@ -61,11 +64,14 @@ import
org.apache.iotdb.db.consensus.SchemaRegionConsensusImpl;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.pipe.agent.PipeAgent;
import org.apache.iotdb.db.protocol.client.ConfigNodeInfo;
+import org.apache.iotdb.db.protocol.session.ExternalClientSession;
import org.apache.iotdb.db.protocol.session.IClientSession;
import org.apache.iotdb.db.protocol.session.InternalClientSession;
import org.apache.iotdb.db.protocol.session.SessionManager;
import org.apache.iotdb.db.protocol.thrift.OperationType;
+import org.apache.iotdb.db.queryengine.common.DataNodeEndPoints;
import org.apache.iotdb.db.queryengine.common.FragmentInstanceId;
+import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
import
org.apache.iotdb.db.queryengine.execution.executor.RegionExecutionResult;
import org.apache.iotdb.db.queryengine.execution.executor.RegionReadExecutor;
import org.apache.iotdb.db.queryengine.execution.executor.RegionWriteExecutor;
@@ -73,6 +79,10 @@ import
org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceFailur
import org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceInfo;
import
org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceManager;
import
org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceState;
+import org.apache.iotdb.db.queryengine.execution.load.AlignedChunkData;
+import org.apache.iotdb.db.queryengine.execution.load.ChunkData;
+import org.apache.iotdb.db.queryengine.execution.load.NonAlignedChunkData;
+import org.apache.iotdb.db.queryengine.execution.load.TsFileData;
import
org.apache.iotdb.db.queryengine.execution.operator.schema.source.ISchemaSource;
import
org.apache.iotdb.db.queryengine.execution.operator.schema.source.SchemaSourceFactory;
import org.apache.iotdb.db.queryengine.plan.Coordinator;
@@ -82,6 +92,8 @@ import
org.apache.iotdb.db.queryengine.plan.analyze.cache.schema.DataNodeSchemaC
import
org.apache.iotdb.db.queryengine.plan.analyze.partition.ClusterPartitionFetcher;
import
org.apache.iotdb.db.queryengine.plan.analyze.schema.ClusterSchemaFetcher;
import org.apache.iotdb.db.queryengine.plan.analyze.schema.ISchemaFetcher;
+import org.apache.iotdb.db.queryengine.plan.analyze.schema.SchemaValidator;
+import
org.apache.iotdb.db.queryengine.plan.analyze.schema.SchemaValidator.ValidatingSchema;
import org.apache.iotdb.db.queryengine.plan.execution.ExecutionResult;
import org.apache.iotdb.db.queryengine.plan.execution.IQueryExecution;
import org.apache.iotdb.db.queryengine.plan.expression.Expression;
@@ -204,9 +216,13 @@ import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.trigger.api.enums.FailureStrategy;
import org.apache.iotdb.trigger.api.enums.TriggerEvent;
import org.apache.iotdb.tsfile.exception.NotImplementedException;
+import org.apache.iotdb.tsfile.file.header.ChunkHeader;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
import org.apache.iotdb.tsfile.read.common.block.TsBlock;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+import org.apache.iotdb.tsfile.utils.TsFileUtils;
import org.apache.iotdb.tsfile.write.record.Tablet;
import com.google.common.collect.ImmutableList;
@@ -218,6 +234,7 @@ import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.time.ZoneId;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@@ -384,27 +401,151 @@ public class DataNodeInternalRPCServiceImpl implements
IDataNodeRPCService.Iface
throw new UnsupportedOperationException();
}
+ private ValidatingSchema extractValidatingSchema(LoadTsFilePieceNode
pieceNode)
+ throws IllegalPathException {
+ ValidatingSchema validatingSchema = new ValidatingSchema();
+ String currDevice = null;
+ String currMeasurement = null;
+ List<String> measurements = new ArrayList<>();
+ List<TSDataType> dataTypes = new ArrayList<>();
+ List<TSEncoding> encodings = new ArrayList<>();
+ List<CompressionType> compressionTypes = new ArrayList<>();
+ boolean isAligned = false;
+ for (TsFileData data : pieceNode.getAllTsFileData()) {
+ if (data.isModification()) {
+ continue;
+ }
+
+ ChunkData chunkData = (ChunkData) data;
+ String device = chunkData.getDevice();
+ String measurement = chunkData.firstMeasurement();
+ if (currDevice == null) {
+ currDevice = device;
+ currMeasurement = measurement;
+ } else if (!currDevice.equals(device)) {
+ validatingSchema.devicePaths.add(new PartialPath(currDevice));
+ validatingSchema.measurements.add(measurements.toArray(new String[0]));
+ validatingSchema.dataTypes.add(dataTypes.toArray(new TSDataType[0]));
+ validatingSchema.encodings.add(encodings.toArray(new TSEncoding[0]));
+ validatingSchema.compressionTypes.add(compressionTypes.toArray(new
CompressionType[0]));
+ validatingSchema.isAlignedList.add(isAligned);
+ currDevice = device;
+ currMeasurement = measurement;
+ measurements.clear();
+ ;
+ dataTypes.clear();
+ encodings.clear();
+ compressionTypes.clear();
+ } else if (currMeasurement.equals(measurement)) {
+ continue;
+ }
+
+ if (chunkData.isAligned()) {
+ AlignedChunkData alignedChunkData = (AlignedChunkData) chunkData;
+ for (ChunkHeader header : alignedChunkData.getChunkHeaderList()) {
+ measurements.add(header.getMeasurementID());
+ dataTypes.add(header.getDataType());
+ encodings.add(header.getEncodingType());
+ compressionTypes.add(header.getCompressionType());
+ }
+ isAligned = true;
+ } else {
+ NonAlignedChunkData nonAlignedChunkData = (NonAlignedChunkData)
chunkData;
+ ChunkHeader header = nonAlignedChunkData.getChunkHeader();
+ measurements.add(header.getMeasurementID());
+ dataTypes.add(header.getDataType());
+ encodings.add(header.getEncodingType());
+ compressionTypes.add(header.getCompressionType());
+ isAligned = false;
+ }
+ }
+
+ validatingSchema.devicePaths.add(new PartialPath(currDevice));
+ validatingSchema.measurements.add(measurements.toArray(new String[0]));
+ validatingSchema.dataTypes.add(dataTypes.toArray(new TSDataType[0]));
+ validatingSchema.encodings.add(encodings.toArray(new TSEncoding[0]));
+ validatingSchema.compressionTypes.add(compressionTypes.toArray(new
CompressionType[0]));
+ validatingSchema.isAlignedList.add(isAligned);
+ return validatingSchema;
+ }
+
@Override
- public TLoadResp sendTsFilePieceNode(TTsFilePieceReq req) {
- LOGGER.info("Receive load node from uuid {}.", req.uuid);
+ public TLoadResp sendTsFilePieceNode(TTsFilePieceReq req) throws TException {
+ LOGGER.debug("Receive load node from uuid {} of {}.", req.uuid,
req.consensusGroupId);
ConsensusGroupId groupId =
ConsensusGroupId.Factory.createFromTConsensusGroupId(req.consensusGroupId);
- LoadTsFilePieceNode pieceNode = (LoadTsFilePieceNode)
PlanNodeType.deserialize(req.body);
- if (pieceNode == null) {
- return createTLoadResp(
- new
TSStatus(TSStatusCode.DESERIALIZE_PIECE_OF_TSFILE_ERROR.getStatusCode()));
- }
- TSStatus resultStatus =
- StorageEngine.getInstance()
- .writeLoadTsFileNode((DataRegionId) groupId, pieceNode, req.uuid);
+ IClientSession session = new ExternalClientSession(req.uuid);
+ try {
+ ByteBuffer buffer =
+ req.isSetCompressionType()
+ ? TsFileUtils.uncompress(
+ req.uncompressedLength,
+ CompressionType.deserialize(req.compressionType),
+ req.body)
+ : req.body.slice();
+ LoadTsFilePieceNode pieceNode = (LoadTsFilePieceNode)
PlanNodeType.deserialize(buffer);
+ if (pieceNode == null) {
+ return createTLoadResp(
+ new
TSStatus(TSStatusCode.DESERIALIZE_PIECE_OF_TSFILE_ERROR.getStatusCode()));
+ }
+
+ if (req.needSchemaRegistration
+ &&
IoTDBDescriptor.getInstance().getConfig().isAutoCreateSchemaEnabled()) {
+ TSStatus status = AuthorityChecker.checkUser(req.getUsername(),
req.getPassword());
+ if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ return createTLoadResp(status);
+ }
- return createTLoadResp(resultStatus);
+ try {
+ ValidatingSchema validatingSchema =
extractValidatingSchema(pieceNode);
+ SESSION_MANAGER.registerSession(session);
+ SESSION_MANAGER.supplySession(
+ session, req.getUsername(), ZoneId.systemDefault().getId(),
ClientVersion.V_1_0);
+
+ MPPQueryContext queryContext =
+ new MPPQueryContext(
+ "",
+ COORDINATOR.createQueryId(),
+ SESSION_MANAGER.getSessionInfo(session),
+ DataNodeEndPoints.LOCAL_HOST_DATA_BLOCK_ENDPOINT,
+ DataNodeEndPoints.LOCAL_HOST_INTERNAL_ENDPOINT);
+ SchemaValidator.validate(schemaFetcher, validatingSchema,
queryContext);
+ } catch (IllegalPathException e) {
+ throw new TException(e);
+ }
+ }
+
+ TSStatus resultStatus =
+ StorageEngine.getInstance()
+ .writeLoadTsFileNode((DataRegionId) groupId, pieceNode,
req.uuid);
+
+ if (req.relayTargets != null) {
+ TRegionReplicaSet replicaSet = req.relayTargets;
+ req.relayTargets = null;
+ req.needSchemaRegistration = false;
+ for (TDataNodeLocation dataNodeLocation :
replicaSet.getDataNodeLocations()) {
+ TEndPoint internalEndPoint = dataNodeLocation.getInternalEndPoint();
+ try (SyncDataNodeInternalServiceClient client =
+
COORDINATOR.getInternalServiceClientManager().borrowClient(internalEndPoint)) {
+ client.sendTsFilePieceNode(req);
+ }
+ }
+ }
+ return createTLoadResp(resultStatus);
+ } catch (ClientManagerException | IOException e) {
+ throw new TException(e);
+ } finally {
+ SESSION_MANAGER.closeSession(session,
COORDINATOR::cleanupQueryExecution);
+ SESSION_MANAGER.removeCurrSession();
+ }
}
@Override
public TLoadResp sendLoadCommand(TLoadCommandReq req) {
+ LOGGER.info("Receive load command for {}", req.getUuid());
+
return createTLoadResp(
StorageEngine.getInstance()
.executeLoadCommand(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/QueryId.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/QueryId.java
index 307a7e9d9e5..33317ffb895 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/QueryId.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/QueryId.java
@@ -121,7 +121,11 @@ public class QueryId {
for (int i = 0; i < id.length(); i++) {
char c = id.charAt(i);
- if (!(c == '_' || c >= 'a' && c <= 'z' || c >= '0' && c <= '9')) {
+ if (!(c == '_'
+ || c >= 'A' && c <= 'Z'
+ || c >= 'a' && c <= 'z'
+ || c >= '0' && c <= '9'
+ || c == '-')) {
return false;
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/LoadTsFileManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/LoadTsFileManager.java
index 5baf1de9314..04b8fdb7c6b 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/LoadTsFileManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/LoadTsFileManager.java
@@ -50,6 +50,7 @@ import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
@@ -76,6 +77,7 @@ public class LoadTsFileManager {
private final ScheduledExecutorService cleanupExecutors;
private final Map<String, ScheduledFuture<?>> uuid2Future;
+ private final Set<String> completedTasks;
public LoadTsFileManager() {
this.loadDir =
SystemFileFactory.INSTANCE.getFile(CONFIG.getLoadTsFileDir());
@@ -83,6 +85,7 @@ public class LoadTsFileManager {
this.cleanupExecutors =
IoTDBThreadPoolFactory.newScheduledThreadPool(1,
LoadTsFileManager.class.getName());
this.uuid2Future = new ConcurrentHashMap<>();
+ this.completedTasks = new ConcurrentSkipListSet<>();
recover();
}
@@ -137,16 +140,24 @@ public class LoadTsFileManager {
public boolean loadAll(String uuid, boolean isGeneratedByPipe)
throws IOException, LoadFileException {
- if (!uuid2WriterManager.containsKey(uuid)) {
+ TsFileWriterManager tsFileWriterManager = uuid2WriterManager.get(uuid);
+ if (completedTasks.contains(uuid)) {
+ return true;
+ }
+ if (tsFileWriterManager == null) {
return false;
}
- uuid2WriterManager.get(uuid).loadAll(isGeneratedByPipe);
+ tsFileWriterManager.loadAll(isGeneratedByPipe);
clean(uuid);
return true;
}
public boolean deleteAll(String uuid) {
- if (!uuid2WriterManager.containsKey(uuid)) {
+ TsFileWriterManager tsFileWriterManager = uuid2WriterManager.get(uuid);
+ if (completedTasks.contains(uuid)) {
+ return true;
+ }
+ if (tsFileWriterManager == null) {
return false;
}
clean(uuid);
@@ -154,10 +165,17 @@ public class LoadTsFileManager {
}
private void clean(String uuid) {
- uuid2WriterManager.get(uuid).close();
- uuid2WriterManager.remove(uuid);
- uuid2Future.get(uuid).cancel(true);
- uuid2Future.remove(uuid);
+ // record completed tasks for second phase retry
+ completedTasks.add(uuid);
+
+ TsFileWriterManager writerManager = uuid2WriterManager.remove(uuid);
+ if (writerManager != null) {
+ writerManager.close();
+ }
+ ScheduledFuture<?> future = uuid2Future.remove(uuid);
+ if (future != null) {
+ future.cancel(true);
+ }
final Path loadDirPath = loadDir.toPath();
if (!Files.exists(loadDirPath)) {
@@ -267,20 +285,25 @@ public class LoadTsFileManager {
receivedSplitIds.add(deletionData.getSplitId());
}
- private void loadAll(boolean isGeneratedByPipe) throws IOException,
LoadFileException {
+ private synchronized void loadAll(boolean isGeneratedByPipe)
+ throws IOException, LoadFileException {
if (isClosed) {
- throw new
IOException(String.format(MESSAGE_WRITER_MANAGER_HAS_BEEN_CLOSED, taskDir));
+ // this command is a retry and the previous command has loaded the task
+ // treat this one as a success
+ return;
}
for (Map.Entry<DataPartitionInfo, TsFileIOWriter> entry :
dataPartition2Writer.entrySet()) {
TsFileIOWriter writer = entry.getValue();
if (writer.isWritingChunkGroup()) {
writer.endChunkGroup();
}
- writer.endFile();
- entry
- .getKey()
- .getDataRegion()
- .loadNewTsFile(generateResource(writer), true, isGeneratedByPipe);
+ if (writer.canWrite()) {
+ writer.endFile();
+ entry
+ .getKey()
+ .getDataRegion()
+ .loadNewTsFile(generateResource(writer), true,
isGeneratedByPipe);
+ }
}
}
@@ -290,7 +313,7 @@ public class LoadTsFileManager {
return tsFileResource;
}
- private void close() {
+ private synchronized void close() {
if (isClosed) {
return;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/TsFileSplitSender.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/TsFileSplitSender.java
index d6f0ac87e79..50686d37b8d 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/TsFileSplitSender.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/TsFileSplitSender.java
@@ -53,6 +53,7 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayDeque;
+import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -66,9 +67,11 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
+import static org.apache.iotdb.commons.conf.IoTDBConstant.MB;
import static
org.apache.iotdb.db.queryengine.plan.scheduler.load.LoadTsFileDispatcherImpl.NODE_CONNECTION_ERROR;
public class TsFileSplitSender {
@@ -84,13 +87,13 @@ public class TsFileSplitSender {
private final IClientManager<TEndPoint, SyncDataNodeInternalServiceClient>
internalServiceClientManager;
// All consensus groups accessed in Phase1 should be notified in Phase2
- private final Set<TRegionReplicaSet> allReplicaSets = new
ConcurrentSkipListSet<>();
+ private final Set<TDataNodeLocation> allReplicaSets = new
ConcurrentSkipListSet<>();
private String uuid;
private LocationStatistics locationStatistics = new LocationStatistics();
private boolean isGeneratedByPipe;
private Map<Pair<LoadTsFilePieceNode, TRegionReplicaSet>, Exception>
phaseOneFailures =
new ConcurrentHashMap<>();
- private Map<TRegionReplicaSet, Exception> phaseTwoFailures = new HashMap<>();
+ private Map<TDataNodeLocation, Exception> phaseTwoFailures = new HashMap<>();
private long maxSplitSize;
private PieceNodeSplitter pieceNodeSplitter = new
ClusteringMeasurementSplitter(1.0, 10);
// private PieceNodeSplitter pieceNodeSplitter = new
OrderedMeasurementSplitter();
@@ -100,6 +103,7 @@ public class TsFileSplitSender {
private Queue<Pair<Future<List<LoadTsFilePieceNode>>, TRegionReplicaSet>>
splitFutures;
private int maxConcurrentFileNum;
private String userName;
+ private String password;
public TsFileSplitSender(
LoadTsFileNode loadTsFileNode,
@@ -109,7 +113,8 @@ public class TsFileSplitSender {
boolean isGeneratedByPipe,
long maxSplitSize,
int maxConcurrentFileNum,
- String userName) {
+ String userName,
+ String password) {
this.loadTsFileNode = loadTsFileNode;
this.targetPartitionFetcher = targetPartitionFetcher;
this.targetPartitionInterval = targetPartitionInterval;
@@ -120,6 +125,9 @@ public class TsFileSplitSender {
this.splitFutures = new ArrayDeque<>(MAX_PENDING_PIECE_NODE);
this.maxConcurrentFileNum = maxConcurrentFileNum;
this.userName = userName;
+ this.password = password;
+
+ this.statistic.totalSize = loadTsFileNode.getTotalSize();
}
public void start() throws IOException {
@@ -127,6 +135,7 @@ public class TsFileSplitSender {
// skip files without data
loadTsFileNode.getResources().removeIf(f -> f.getDevices().isEmpty());
uuid = UUID.randomUUID().toString();
+ logger.info("Start to split {}", loadTsFileNode);
boolean isFirstPhaseSuccess = firstPhase(loadTsFileNode);
boolean isSecondPhaseSuccess = secondPhase(isFirstPhaseSuccess);
@@ -176,58 +185,79 @@ public class TsFileSplitSender {
tsFileDataManager.sendAllTsFileData()
&& processRemainingPieceNodes()
&& phaseOneFailures.isEmpty();
- logger.info("Cleanup ends after {}ms", System.currentTimeMillis() - start);
+ statistic.p1TimeMs = System.currentTimeMillis() - start;
+ logger.info("Cleanup ends after {}ms", statistic.p1TimeMs);
return success;
}
private boolean secondPhase(boolean isFirstPhaseSuccess) {
- logger.info("Start dispatching Load command for uuid {}", uuid);
TLoadCommandReq loadCommandReq =
new TLoadCommandReq(
(isFirstPhaseSuccess ? LoadCommand.EXECUTE :
LoadCommand.ROLLBACK).ordinal(), uuid);
loadCommandReq.setIsGeneratedByPipe(isGeneratedByPipe);
- for (TRegionReplicaSet replicaSet : allReplicaSets) {
- loadCommandReq.setUseConsensus(true);
- for (TDataNodeLocation dataNodeLocation :
replicaSet.getDataNodeLocations()) {
- TEndPoint endPoint = dataNodeLocation.getInternalEndPoint();
- Exception groupException = null;
- loadCommandReq.setConsensusGroupId(replicaSet.getRegionId());
-
- for (int i = 0; i < MAX_RETRY; i++) {
- try (SyncDataNodeInternalServiceClient client =
- internalServiceClientManager.borrowClient(endPoint)) {
- TLoadResp loadResp = client.sendLoadCommand(loadCommandReq);
- if (!loadResp.isAccepted()) {
- logger.warn(loadResp.message);
- groupException = new
FragmentInstanceDispatchException(loadResp.status);
- }
- break;
- } catch (ClientManagerException | TException e) {
- logger.warn(NODE_CONNECTION_ERROR, endPoint, e);
- TSStatus status = new TSStatus();
- status.setCode(TSStatusCode.DISPATCH_ERROR.getStatusCode());
- status.setMessage(
- "can't connect to node {}, please reset longer
dn_connection_timeout_ms "
- + "in iotdb-common.properties and restart iotdb."
- + endPoint);
- groupException = new FragmentInstanceDispatchException(status);
- }
- try {
- Thread.sleep(RETRY_INTERVAL_MS);
- } catch (InterruptedException e) {
- groupException = e;
- break;
- }
- }
-
- if (groupException != null) {
- phaseTwoFailures.put(replicaSet, groupException);
- } else {
- break;
- }
+ long p2StartMS = System.currentTimeMillis();
+ List<Pair<TDataNodeLocation, Future<Void>>> loadFutures = new
ArrayList<>();
+ AtomicBoolean hasTimeout = new AtomicBoolean();
+ for (TDataNodeLocation dataNodeLocation : allReplicaSets) {
+ loadFutures.add(
+ new Pair<>(
+ dataNodeLocation,
+ splitNodeService.submit(
+ () -> {
+ logger.info(
+ "Start dispatching Load command for uuid {} to {}",
uuid, dataNodeLocation);
+ TEndPoint endPoint =
dataNodeLocation.getInternalEndPoint();
+ Exception locationException = null;
+
+ for (int i = 0; i < MAX_RETRY; i++) {
+ long timeout = 0;
+ try (SyncDataNodeInternalServiceClient client =
+ internalServiceClientManager.borrowClient(endPoint))
{
+ timeout = client.getTimeout();
+ TLoadResp loadResp =
client.sendLoadCommand(loadCommandReq);
+ if (!loadResp.isAccepted()) {
+ logger.warn(loadResp.message);
+ locationException =
+ new
FragmentInstanceDispatchException(loadResp.status);
+ } else {
+ locationException = null;
+ }
+ break;
+ } catch (ClientManagerException | TException e) {
+ TSStatus status = new TSStatus();
+
status.setCode(TSStatusCode.DISPATCH_ERROR.getStatusCode());
+ status.setMessage(
+ "can't connect to node {}, please reset longer
dn_connection_timeout_ms "
+ + "in iotdb-common.properties and restart
iotdb."
+ + endPoint);
+ locationException = new
FragmentInstanceDispatchException(status);
+ hasTimeout.set(true);
+ statistic.p2Timeout = timeout;
+ }
+ try {
+ Thread.sleep(RETRY_INTERVAL_MS);
+ } catch (InterruptedException e) {
+ locationException = e;
+ break;
+ }
+ }
+
+ if (locationException != null) {
+ phaseTwoFailures.put(dataNodeLocation,
locationException);
+ }
+ return null;
+ })));
+ }
+ for (Pair<TDataNodeLocation, Future<Void>> loadFuture : loadFutures) {
+ try {
+ loadFuture.right.get();
+ } catch (InterruptedException | ExecutionException e) {
+ phaseTwoFailures.put(loadFuture.left, e);
}
}
+ statistic.p2TimeMS = System.currentTimeMillis() - p2StartMS;
+ statistic.hasP2Timeout = hasTimeout.get();
return phaseTwoFailures.isEmpty();
}
@@ -280,10 +310,6 @@ public class TsFileSplitSender {
private boolean dispatchPieceNodes(
List<LoadTsFilePieceNode> subNodes, TRegionReplicaSet replicaSet) {
- AtomicLong minDispatchTime = new AtomicLong(Long.MAX_VALUE);
- AtomicLong maxDispatchTime = new AtomicLong(Long.MIN_VALUE);
- AtomicLong sumDispatchTime = new AtomicLong();
- AtomicLong sumCompressingTime = new AtomicLong();
long start = System.nanoTime();
List<Boolean> subNodeResults =
@@ -303,12 +329,12 @@ public class TsFileSplitSender {
return false;
}
long compressingTime = System.nanoTime() - startTime;
- sumCompressingTime.addAndGet(compressingTime);
- statistic.compressingTime.addAndGet(compressingTime);
+ statistic.compressingTimeNs.addAndGet(compressingTime);
TTsFilePieceReq loadTsFileReq =
new TTsFilePieceReq(buffer, uuid,
replicaSet.getRegionId());
- loadTsFileReq.isRelay = true;
+ loadTsFileReq.setUsername(userName);
+ loadTsFileReq.setPassword(password);
loadTsFileReq.setCompressionType(compressionType.serialize());
loadTsFileReq.setUncompressedLength(uncompressedLength);
LocationSequencer locationSequencer =
createLocationSequencer(replicaSet);
@@ -317,6 +343,11 @@ public class TsFileSplitSender {
Exception lastConnectionError = null;
TDataNodeLocation currLocation = null;
for (TDataNodeLocation location : locationSequencer) {
+ TRegionReplicaSet relaySet = new
TRegionReplicaSet(replicaSet);
+ relaySet.getDataNodeLocations().remove(location);
+ loadTsFileReq.setRelayTargets(relaySet);
+ loadTsFileReq.setNeedSchemaRegistration(true);
+
if (location.getDataNodeId() == 0 &&
logger.isDebugEnabled()) {
locationStatistics.logLocationStatistics();
logger.info("Chose location {}",
location.getDataNodeId());
@@ -370,52 +401,22 @@ public class TsFileSplitSender {
locationStatistics.updateThroughput(
currLocation, node.getDataSize(), timeConsumption);
- synchronized (maxDispatchTime) {
- if (maxDispatchTime.get() < timeConsumption) {
- maxDispatchTime.set(timeConsumption);
- }
- }
- synchronized (minDispatchTime) {
- if (minDispatchTime.get() > timeConsumption) {
- minDispatchTime.set(timeConsumption);
- }
- }
- sumDispatchTime.addAndGet(timeConsumption);
return true;
})
.collect(Collectors.toList());
long elapsedTime = System.nanoTime() - start;
- statistic.dispatchNodesTime.addAndGet(elapsedTime);
- logger.debug(
- "Dispatched one node after {}ms, min/maxDispatching time: {}/{}ns,
avg: {}ns, sum: {}ns, compressing: {}ns",
- elapsedTime / 1_000_000L,
- minDispatchTime.get(),
- maxDispatchTime.get(),
- sumDispatchTime.get() / subNodes.size(),
- sumDispatchTime.get(),
- sumCompressingTime.get());
+ statistic.dispatchNodesTimeNS.addAndGet(elapsedTime);
return !subNodeResults.contains(false);
}
public boolean dispatchOnePieceNode(LoadTsFilePieceNode pieceNode,
TRegionReplicaSet replicaSet) {
long allStart = System.nanoTime();
- allReplicaSets.add(replicaSet);
- if (false) {
- long start = System.nanoTime();
- List<LoadTsFilePieceNode> split = pieceNodeSplitter.split(pieceNode);
- long elapsedTime = System.nanoTime() - start;
- statistic.splitTime.addAndGet(elapsedTime);
- statistic.pieceNodeNum.incrementAndGet();
- logger.debug("{} splits are generated after {}ms", split.size(),
elapsedTime / 1_000_000L);
- boolean success = dispatchPieceNodes(split, replicaSet);
- statistic.dispatchNodeTime.addAndGet(System.nanoTime() - allStart);
- return success;
- }
+ allReplicaSets.addAll(replicaSet.dataNodeLocations);
List<LoadTsFilePieceNode> subNodes;
if (splitFutures.size() < MAX_PENDING_PIECE_NODE) {
splitFutures.add(new Pair<>(submitSplitPieceNode(pieceNode),
replicaSet));
- statistic.dispatchNodeTime.addAndGet(System.nanoTime() - allStart);
+ statistic.dispatchNodeTimeNS.addAndGet(System.nanoTime() - allStart);
return true;
} else {
long start = System.nanoTime();
@@ -434,7 +435,7 @@ public class TsFileSplitSender {
return false;
}
boolean success = dispatchPieceNodes(subNodes, pair.right);
- statistic.dispatchNodeTime.addAndGet(System.nanoTime() - allStart);
+ statistic.dispatchNodeTimeNS.addAndGet(System.nanoTime() - allStart);
return success;
}
}
@@ -447,24 +448,42 @@ public class TsFileSplitSender {
public AtomicLong compressedSize = new AtomicLong();
public AtomicLong splitTime = new AtomicLong();
public AtomicLong pieceNodeNum = new AtomicLong();
- public AtomicLong dispatchNodesTime = new AtomicLong();
- public AtomicLong dispatchNodeTime = new AtomicLong();
- public AtomicLong compressingTime = new AtomicLong();
+ public AtomicLong dispatchNodesTimeNS = new AtomicLong();
+ public AtomicLong dispatchNodeTimeNS = new AtomicLong();
+ public AtomicLong compressingTimeNs = new AtomicLong();
+ public long p1TimeMs;
+ public long p2TimeMS;
+ public long totalSize;
+ public boolean hasP2Timeout;
+ public long p2Timeout;
public void logStatistic() {
- logger.info("Time consumption: {}ms", taskEndTime - taskStartTime);
+ logger.info(
+ "Time consumption: {}ms, totalSize: {}MB",
+ taskEndTime - taskStartTime,
+ totalSize * 1.0 / MB);
logger.info(
"Generated {} piece nodes, splitTime: {}, dispatchSplitsTime: {},
dispatchNodeTime: {}",
pieceNodeNum.get(),
splitTime.get() / 1_000_000L,
- dispatchNodesTime.get() / 1_000_000L,
- dispatchNodeTime.get() / 1_000_000L);
+ dispatchNodesTimeNS.get() / 1_000_000L,
+ dispatchNodeTimeNS.get() / 1_000_000L);
logger.info(
"Transmission size: {}/{} ({}), compressionTime: {}ms",
compressedSize.get(),
rawSize.get(),
compressedSize.get() * 1.0 / rawSize.get(),
- compressingTime.get() / 1_000_000L);
+ compressingTimeNs.get() / 1_000_000L);
+ logger.info("Sync TsFile time: {}ms ({})", p1TimeMs, p1ThroughputMBPS());
+ logger.info("Load command execution time: {}ms ({})", p2TimeMS,
p2ThroughputMBPS());
+ }
+
+ public double p2ThroughputMBPS() {
+ return totalSize * 1.0 / MB / (p2TimeMS / 1000.0);
+ }
+
+ public double p1ThroughputMBPS() {
+ return totalSize * 1.0 / MB / (p1TimeMS / 1000.0);
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/partition/PartitionCache.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/partition/PartitionCache.java
index 308d7ea221b..e3b7d7b0615 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/partition/PartitionCache.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/partition/PartitionCache.java
@@ -25,9 +25,7 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot;
import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
import org.apache.iotdb.commons.auth.entity.PrivilegeType;
-import org.apache.iotdb.commons.client.IClientManager;
import org.apache.iotdb.commons.client.exception.ClientManagerException;
-import org.apache.iotdb.commons.consensus.ConfigRegionId;
import org.apache.iotdb.commons.exception.IoTDBException;
import org.apache.iotdb.commons.exception.MetadataException;
import org.apache.iotdb.commons.partition.DataPartition;
@@ -50,6 +48,7 @@ import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.exception.sql.StatementAnalyzeException;
import org.apache.iotdb.db.protocol.client.ConfigNodeClient;
+import
org.apache.iotdb.db.protocol.client.ConfigNodeClient.ConfigNodeClientProvider;
import org.apache.iotdb.db.protocol.client.ConfigNodeClientManager;
import org.apache.iotdb.db.protocol.client.ConfigNodeInfo;
import org.apache.iotdb.db.schemaengine.schemaregion.utils.MetaUtils;
@@ -75,6 +74,7 @@ import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantReadWriteLock;
public class PartitionCache {
+
private static final Logger logger =
LoggerFactory.getLogger(PartitionCache.class);
private static final IoTDBConfig config =
IoTDBDescriptor.getInstance().getConfig();
private static final List<String> ROOT_PATH = Arrays.asList("root", "**");
@@ -107,8 +107,8 @@ public class PartitionCache {
private final ReentrantReadWriteLock regionReplicaSetLock = new
ReentrantReadWriteLock();
- private final IClientManager<ConfigRegionId, ConfigNodeClient>
configNodeClientManager =
- ConfigNodeClientManager.getInstance();
+ private ConfigNodeClientProvider configNodeClientProvider =
+ () ->
ConfigNodeClientManager.getInstance().borrowClient(ConfigNodeInfo.CONFIG_REGION_ID);
private final CacheMetrics cacheMetrics;
public PartitionCache() {
@@ -120,6 +120,11 @@ public class PartitionCache {
this.cacheMetrics = new CacheMetrics();
}
+ public PartitionCache(ConfigNodeClientProvider configNodeClientProvider) {
+ this();
+ this.configNodeClientProvider = configNodeClientProvider;
+ }
+
// region database cache
/**
@@ -191,8 +196,7 @@ public class PartitionCache {
private void fetchStorageGroupAndUpdateCache(
StorageGroupCacheResult<?> result, List<String> devicePaths)
throws ClientManagerException, TException {
- try (ConfigNodeClient client =
- configNodeClientManager.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID))
{
+ try (ConfigNodeClient client = configNodeClientProvider.supply()) {
storageGroupCacheLock.writeLock().lock();
result.reset();
getStorageGroupMap(result, devicePaths, true);
@@ -222,8 +226,7 @@ public class PartitionCache {
private void createStorageGroupAndUpdateCache(
StorageGroupCacheResult<?> result, List<String> devicePaths, String
userName)
throws ClientManagerException, MetadataException, TException {
- try (ConfigNodeClient client =
- configNodeClientManager.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID))
{
+ try (ConfigNodeClient client = configNodeClientProvider.supply()) {
storageGroupCacheLock.writeLock().lock();
// try to check whether database need to be created
result.reset();
@@ -416,6 +419,7 @@ public class PartitionCache {
// endregion
// region replicaSet cache
+
/**
* get regionReplicaSet from local and confignode
*
@@ -439,8 +443,7 @@ public class PartitionCache {
regionReplicaSetLock.writeLock().lock();
// verify that there are not hit in cache
if (!groupIdToReplicaSetMap.containsKey(consensusGroupId)) {
- try (ConfigNodeClient client =
-
configNodeClientManager.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
+ try (ConfigNodeClient client = configNodeClientProvider.supply()) {
TRegionRouteMapResp resp = client.getLatestRegionRouteMap();
if (TSStatusCode.SUCCESS_STATUS.getStatusCode() ==
resp.getStatus().getCode()) {
updateGroupIdToReplicaSetMap(resp.getTimestamp(),
resp.getRegionRouteMap());
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/partition/BasicPartitionFetcher.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/partition/BasicPartitionFetcher.java
index 8536457fbf2..98e50f2720b 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/partition/BasicPartitionFetcher.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/partition/BasicPartitionFetcher.java
@@ -66,11 +66,11 @@ public abstract class BasicPartitionFetcher implements
IPartitionFetcher {
protected final PartitionCache partitionCache;
- protected BasicPartitionFetcher(int seriesPartitionSlotNum) {
+ protected BasicPartitionFetcher(int seriesPartitionSlotNum, PartitionCache
partitionCache) {
this.partitionExecutor =
SeriesPartitionExecutor.getSeriesPartitionExecutor(
config.getSeriesPartitionExecutorClass(), seriesPartitionSlotNum);
- this.partitionCache = new PartitionCache();
+ this.partitionCache = partitionCache;
}
protected abstract ConfigNodeClient getClient() throws
ClientManagerException, TException;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/partition/ClusterPartitionFetcher.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/partition/ClusterPartitionFetcher.java
index f8d218bf71f..b38346c150c 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/partition/ClusterPartitionFetcher.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/partition/ClusterPartitionFetcher.java
@@ -25,6 +25,7 @@ import org.apache.iotdb.commons.consensus.ConfigRegionId;
import org.apache.iotdb.db.protocol.client.ConfigNodeClient;
import org.apache.iotdb.db.protocol.client.ConfigNodeClientManager;
import org.apache.iotdb.db.protocol.client.ConfigNodeInfo;
+import
org.apache.iotdb.db.queryengine.plan.analyze.cache.partition.PartitionCache;
public class ClusterPartitionFetcher extends BasicPartitionFetcher {
private final IClientManager<ConfigRegionId, ConfigNodeClient>
configNodeClientManager =
@@ -42,7 +43,7 @@ public class ClusterPartitionFetcher extends
BasicPartitionFetcher {
}
private ClusterPartitionFetcher() {
- super(config.getSeriesPartitionSlotNum());
+ super(config.getSeriesPartitionSlotNum(), new PartitionCache());
}
@Override
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/partition/ExternalPartitionFetcher.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/partition/ExternalPartitionFetcher.java
index ac1624c85c6..e9e7adb53b0 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/partition/ExternalPartitionFetcher.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/partition/ExternalPartitionFetcher.java
@@ -22,6 +22,7 @@ package
org.apache.iotdb.db.queryengine.plan.analyze.partition;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.commons.client.property.ThriftClientProperty;
import org.apache.iotdb.db.protocol.client.ConfigNodeClient;
+import
org.apache.iotdb.db.queryengine.plan.analyze.cache.partition.PartitionCache;
import org.apache.thrift.TException;
@@ -40,7 +41,9 @@ public class ExternalPartitionFetcher extends
BasicPartitionFetcher {
List<TEndPoint> externalConfigNodes,
ThriftClientProperty property,
int seriesPartitionSlotNum) {
- super(seriesPartitionSlotNum);
+ super(
+ seriesPartitionSlotNum,
+ new PartitionCache(() -> new ConfigNodeClient(externalConfigNodes,
property, null)));
this.externalConfigNodes = externalConfigNodes;
this.property = property;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/SchemaValidator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/SchemaValidator.java
index 0558d375988..78bf3023943 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/SchemaValidator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/SchemaValidator.java
@@ -32,6 +32,7 @@ import
org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import java.util.ArrayList;
import java.util.List;
public class SchemaValidator {
@@ -66,4 +67,26 @@ public class SchemaValidator {
return schemaFetcher.fetchSchemaListWithAutoCreate(
devicePaths, measurements, dataTypes, encodings, compressionTypes,
isAlignedList, context);
}
+
+ public static ISchemaTree validate(
+ ISchemaFetcher schemaFetcher, ValidatingSchema validatingSchema,
MPPQueryContext context) {
+ return schemaFetcher.fetchSchemaListWithAutoCreate(
+ validatingSchema.devicePaths,
+ validatingSchema.measurements,
+ validatingSchema.dataTypes,
+ validatingSchema.encodings,
+ validatingSchema.compressionTypes,
+ validatingSchema.isAlignedList,
+ context);
+ }
+
+ public static class ValidatingSchema {
+
+ public List<PartialPath> devicePaths = new ArrayList<>();
+ public List<String[]> measurements = new ArrayList<>();
+ public List<TSDataType[]> dataTypes = new ArrayList<>();
+ public List<TSEncoding[]> encodings = new ArrayList<>();
+ public List<CompressionType[]> compressionTypes = new ArrayList<>();
+ public List<Boolean> isAlignedList = new ArrayList<>();
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadTsFileNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadTsFileNode.java
index 6e9cc5bee33..808b97ab465 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadTsFileNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadTsFileNode.java
@@ -39,10 +39,14 @@ import java.util.Objects;
public class LoadTsFileNode extends WritePlanNode {
private final List<TsFileResource> resources;
+ private long totalSize;
public LoadTsFileNode(PlanNodeId id, List<TsFileResource> resources) {
super(id);
this.resources = resources;
+ for (TsFileResource resource : resources) {
+ totalSize += resource.getTsFileSize();
+ }
}
@Override
@@ -119,4 +123,8 @@ public class LoadTsFileNode extends WritePlanNode {
public TsFileResource lastResource() {
return resources.get(resources.size() - 1);
}
+
+ public long getTotalSize() {
+ return totalSize;
+ }
}
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/load/LoadTsFileManagerTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/load/LoadTsFileManagerTest.java
index 2e2db3dd13e..249532770e6 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/load/LoadTsFileManagerTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/load/LoadTsFileManagerTest.java
@@ -87,6 +87,7 @@ public class LoadTsFileManagerTest extends TestBase {
false,
maxSplitSize,
100,
+ "root",
"root");
long start = System.currentTimeMillis();
splitSender.start();
@@ -156,9 +157,9 @@ public class LoadTsFileManagerTest extends TestBase {
dataRegionMap.get(req.consensusGroupId), pieceNode, req.uuid);
// forward to other replicas in the group
- if (req.isRelay) {
- req.isRelay = false;
- TRegionReplicaSet regionReplicaSet = groupId2ReplicaSetMap.get(groupId);
+ if (req.relayTargets != null) {
+ TRegionReplicaSet regionReplicaSet = req.relayTargets;
+ req.relayTargets = null;
regionReplicaSet.getDataNodeLocations().stream()
.parallel()
.forEach(
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/load/TsFileSplitSenderTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/load/TsFileSplitSenderTest.java
index 3f605a36247..bd903c45548 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/load/TsFileSplitSenderTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/load/TsFileSplitSenderTest.java
@@ -120,6 +120,7 @@ public class TsFileSplitSenderTest extends TestBase {
false,
maxSplitSize,
100,
+ "root",
"root");
long start = System.currentTimeMillis();
splitSender.start();
@@ -143,16 +144,16 @@ public class TsFileSplitSenderTest extends TestBase {
long handleStart = System.nanoTime();
if ((tEndpoint.getPort() - 10000) % 3 == 0
&& random.nextDouble() < packetLossRatio
- && req.isRelay) {
+ && req.relayTargets != null) {
throw new TException("Packet lost");
}
if ((tEndpoint.getPort() - 10000) % 3 == 1
&& random.nextDouble() < packetLossRatio / 2
- && req.isRelay) {
+ && req.relayTargets != null) {
throw new TException("Packet lost");
}
- if ((tEndpoint.getPort() - 10000) % 3 == 0 && req.isRelay &&
stuckDurationMS > 0) {
+ if ((tEndpoint.getPort() - 10000) % 3 == 0 && req.relayTargets != null &&
stuckDurationMS > 0) {
Pair<Long, Long> nextStuckTime =
nextStuckTimeMap.computeIfAbsent(
tEndpoint,
@@ -216,14 +217,14 @@ public class TsFileSplitSenderTest extends TestBase {
.collect(Collectors.toList()));
if (dummyDelayMS > 0) {
- if ((tEndpoint.getPort() - 10000) % 3 == 0 && req.isRelay) {
+ if ((tEndpoint.getPort() - 10000) % 3 == 0 && req.relayTargets != null) {
try {
Thread.sleep(dummyDelayMS);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
- if ((tEndpoint.getPort() - 10000) % 3 == 1 && req.isRelay) {
+ if ((tEndpoint.getPort() - 10000) % 3 == 1 && req.relayTargets != null) {
try {
Thread.sleep(dummyDelayMS / 2);
} catch (InterruptedException e) {
@@ -233,10 +234,10 @@ public class TsFileSplitSenderTest extends TestBase {
}
// forward to other replicas in the group
- if (req.isRelay) {
+ if (req.relayTargets != null) {
long relayStart = System.nanoTime();
- req.isRelay = false;
- TRegionReplicaSet regionReplicaSet = groupId2ReplicaSetMap.get(groupId);
+ TRegionReplicaSet regionReplicaSet = req.relayTargets;
+ req.relayTargets = null;
regionReplicaSet.getDataNodeLocations().stream()
.parallel()
.forEach(
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeTaskMeta.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeTaskMeta.java
index e5a80f681be..edf464badae 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeTaskMeta.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeTaskMeta.java
@@ -34,6 +34,7 @@ import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.util.ArrayList;
+import java.util.Date;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
@@ -57,9 +58,14 @@ public class PipeTaskMeta {
private final Map<PipeRuntimeException, PipeRuntimeException>
exceptionMessages =
new ConcurrentHashMap<>();
+ private long createTime;
+ private long lastReportTime;
+
public PipeTaskMeta(/* @NotNull */ ProgressIndex progressIndex, int
leaderDataNodeId) {
this.progressIndex.set(progressIndex);
this.leaderDataNodeId.set(leaderDataNodeId);
+ this.createTime = System.currentTimeMillis();
+ this.lastReportTime = System.currentTimeMillis();
}
public ProgressIndex getProgressIndex() {
@@ -67,6 +73,7 @@ public class PipeTaskMeta {
}
public ProgressIndex updateProgressIndex(ProgressIndex updateIndex) {
+ this.lastReportTime = System.currentTimeMillis();
return progressIndex.updateAndGet(
index -> index.updateToMinimumIsAfterProgressIndex(updateIndex));
}
@@ -176,6 +183,11 @@ public class PipeTaskMeta {
+ leaderDataNodeId
+ ", exceptionMessages='"
+ exceptionMessages
+ + ", lastReportSince "
+ + new Date(createTime)
+ + " ("
+ + (lastReportTime - createTime) / 1000
+ + "s)"
+ "'}";
}
}
diff --git
a/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/TsFileUtils.java
b/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/TsFileUtils.java
index a84c735e549..c54028d513c 100644
---
a/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/TsFileUtils.java
+++
b/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/TsFileUtils.java
@@ -62,11 +62,16 @@ public class TsFileUtils {
public static ByteBuffer uncompressPage(
PageHeader header, CompressionType type, ByteBuffer buffer) throws
IOException {
- if (header.getUncompressedSize() == 0 || type ==
CompressionType.UNCOMPRESSED) {
+ return uncompress(header.getUncompressedSize(), type, buffer);
+ }
+
+ public static ByteBuffer uncompress(int uncompressedSize, CompressionType
type, ByteBuffer buffer)
+ throws IOException {
+ if (uncompressedSize == 0 || type == CompressionType.UNCOMPRESSED) {
return buffer;
} // FIXME if the buffer is not array-implemented.
IUnCompressor unCompressor = IUnCompressor.getUnCompressor(type);
- ByteBuffer uncompressedBuffer =
ByteBuffer.allocate(header.getUncompressedSize());
+ ByteBuffer uncompressedBuffer = ByteBuffer.allocate(uncompressedSize);
unCompressor.uncompress(
buffer.array(), buffer.position(), buffer.remaining(),
uncompressedBuffer.array(), 0);
return uncompressedBuffer;
diff --git a/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift
b/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift
index 20001fe5985..65f8ef4e410 100644
--- a/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift
+++ b/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift
@@ -308,10 +308,12 @@ struct TTsFilePieceReq{
1: required binary body
2: required string uuid
3: required common.TConsensusGroupId consensusGroupId
- // if isRelay is true, the receiver should forward the request to other
replicas in the group
- 4: optional bool isRelay
+ 4: optional common.TRegionReplicaSet relayTargets
5: optional i8 compressionType
6: optional i32 uncompressedLength
+ 7: optional bool needSchemaRegistration
+ 8: optional string username
+ 9: optional string password
}
struct TLoadCommandReq{