This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch load_v2
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/load_v2 by this push:
new 723e65834a7 fix sonar issues
723e65834a7 is described below
commit 723e65834a7f378b0317e30bcff849bcb7ee76fd
Author: Tian Jiang <[email protected]>
AuthorDate: Mon Nov 20 15:29:48 2023 +0800
fix sonar issues
---
.../thrift/async/IoTDBThriftAsyncConnector.java | 9 +-
.../impl/DataNodeInternalRPCServiceImpl.java | 24 +-
.../execution/load/TsFileSplitSender.java | 265 ++++++++++++++-------
.../locseq/ThroughputBasedLocationSequencer.java | 2 +-
.../nodesplit/ClusteringMeasurementSplitter.java | 61 ++---
.../analyze/partition/BasicPartitionFetcher.java | 23 +-
.../analyze/partition/ClusterPartitionFetcher.java | 1 +
.../plan/analyze/schema/SchemaValidator.java | 69 +++++-
.../planner/plan/node/load/LoadCommandNode.java | 7 +-
.../iotdb/db/storageengine/StorageEngine.java | 9 +-
.../execution/load/LoadTsFileManagerTest.java | 8 +-
.../execution/load/LoadTsFileSchedulerTest.java | 29 +--
.../db/queryengine/execution/load/TestBase.java | 10 +-
.../execution/load/TsFileSplitSenderTest.java | 50 ++--
.../org/apache/iotdb/tsfile/utils/TsFileUtils.java | 2 +-
15 files changed, 344 insertions(+), 225 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBThriftAsyncConnector.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBThriftAsyncConnector.java
index 10a901a3712..e501d322e53 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBThriftAsyncConnector.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBThriftAsyncConnector.java
@@ -467,15 +467,16 @@ public class IoTDBThriftAsyncConnector extends
IoTDBConnector {
pipeTsFileInsertionEvent.getTsFiles().size(),
targetConfigNodes);
- if (splitSender.getStatistic().hasP2Timeout) {
- double throughput = splitSender.getStatistic().p2ThroughputMBPS();
+ if (splitSender.getStatistic().isHasP2Timeout()) {
+ double throughput = splitSender.getStatistic().p2ThroughputMbps();
Map<String, Object> param = new HashMap<>(2);
param.put(
- PipeBatchTsFileInsertionEvent.CONNECTOR_TIMEOUT_MS,
splitSender.getStatistic().p2Timeout);
+ PipeBatchTsFileInsertionEvent.CONNECTOR_TIMEOUT_MS,
+ splitSender.getStatistic().getP2Timeout());
param.put(PipeBatchTsFileInsertionEvent.CONNECTOR_THROUGHPUT_MBPS_KEY,
throughput);
pipeTsFileInsertionEvent.getExtractorOnConnectorTimeout().apply(param);
} else {
- double throughput = splitSender.getStatistic().p2ThroughputMBPS();
+ double throughput = splitSender.getStatistic().p2ThroughputMbps();
pipeTsFileInsertionEvent
.getExtractorOnConnectorSuccess()
.apply(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
index ef80ec7b9a9..4578d8fe96c 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
@@ -419,12 +419,12 @@ public class DataNodeInternalRPCServiceImpl implements
IDataNodeRPCService.Iface
currDevice = device;
currMeasurement = measurement;
} else if (!currDevice.equals(device)) {
- validatingSchema.devicePaths.add(new PartialPath(currDevice));
- validatingSchema.measurements.add(measurements.toArray(new
String[0]));
- validatingSchema.dataTypes.add(dataTypes.toArray(new TSDataType[0]));
- validatingSchema.encodings.add(encodings.toArray(new TSEncoding[0]));
- validatingSchema.compressionTypes.add(compressionTypes.toArray(new
CompressionType[0]));
- validatingSchema.isAlignedList.add(isAligned);
+ validatingSchema.getDevicePaths().add(new PartialPath(currDevice));
+ validatingSchema.getMeasurements().add(measurements.toArray(new
String[0]));
+ validatingSchema.getDataTypes().add(dataTypes.toArray(new
TSDataType[0]));
+ validatingSchema.getEncodings().add(encodings.toArray(new
TSEncoding[0]));
+
validatingSchema.getCompressionTypes().add(compressionTypes.toArray(new
CompressionType[0]));
+ validatingSchema.getIsAlignedList().add(isAligned);
currDevice = device;
currMeasurement = measurement;
measurements.clear();
@@ -438,12 +438,12 @@ public class DataNodeInternalRPCServiceImpl implements
IDataNodeRPCService.Iface
}
}
- validatingSchema.devicePaths.add(new PartialPath(currDevice));
- validatingSchema.measurements.add(measurements.toArray(new String[0]));
- validatingSchema.dataTypes.add(dataTypes.toArray(new TSDataType[0]));
- validatingSchema.encodings.add(encodings.toArray(new TSEncoding[0]));
- validatingSchema.compressionTypes.add(compressionTypes.toArray(new
CompressionType[0]));
- validatingSchema.isAlignedList.add(isAligned);
+ validatingSchema.getDevicePaths().add(new PartialPath(currDevice));
+ validatingSchema.getMeasurements().add(measurements.toArray(new
String[0]));
+ validatingSchema.getDataTypes().add(dataTypes.toArray(new TSDataType[0]));
+ validatingSchema.getEncodings().add(encodings.toArray(new TSEncoding[0]));
+ validatingSchema.getCompressionTypes().add(compressionTypes.toArray(new
CompressionType[0]));
+ validatingSchema.getIsAlignedList().add(isAligned);
return validatingSchema;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/TsFileSplitSender.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/TsFileSplitSender.java
index 606e96a9435..a53eed48d68 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/TsFileSplitSender.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/TsFileSplitSender.java
@@ -106,6 +106,7 @@ public class TsFileSplitSender {
private String userName;
private String password;
+ @SuppressWarnings("java:S107")
public TsFileSplitSender(
LoadTsFileNode loadTsFileNode,
DataPartitionBatchFetcher targetPartitionFetcher,
@@ -128,11 +129,11 @@ public class TsFileSplitSender {
this.userName = userName;
this.password = password;
- this.statistic.totalSize = loadTsFileNode.getTotalSize();
+ this.statistic.setTotalSize(loadTsFileNode.getTotalSize());
}
public void start() throws IOException {
- statistic.taskStartTime = System.currentTimeMillis();
+ statistic.setTaskStartTime(System.currentTimeMillis());
// skip files without data
loadTsFileNode.getResources().removeIf(f -> f.getDevices().isEmpty());
uuid = UUID.randomUUID().toString();
@@ -145,7 +146,7 @@ public class TsFileSplitSender {
} else {
logger.warn("Can not Load TsFiles {}", loadTsFileNode.getResources());
}
- statistic.taskEndTime = System.currentTimeMillis();
+ statistic.setTaskEndTime(System.currentTimeMillis());
locationStatistics.logLocationStatistics();
statistic.logStatistic();
}
@@ -186,11 +187,40 @@ public class TsFileSplitSender {
tsFileDataManager.sendAllTsFileData()
&& processRemainingPieceNodes()
&& phaseOneFailures.isEmpty();
- statistic.p1TimeMS = System.currentTimeMillis() - start;
- logger.info("Cleanup ends after {}ms", statistic.p1TimeMS);
+ statistic.setP1TimeMS(System.currentTimeMillis() - start);
+ logger.info("Cleanup ends after {}ms", statistic.getP1TimeMS());
return success;
}
+ private boolean loadInGroup(
+ TDataNodeLocation dataNodeLocation, TLoadCommandReq loadCommandReq)
+ throws SocketException, FragmentInstanceDispatchException,
InterruptedException {
+ TEndPoint endPoint = dataNodeLocation.getInternalEndPoint();
+
+ for (int i = 0; i < MAX_RETRY && !Thread.interrupted(); i++) {
+ try (SyncDataNodeInternalServiceClient client =
+ internalServiceClientManager.borrowClient(endPoint)) {
+ // record timeout for recalculating max batch size
+ if (statistic.getP2Timeout() == 0) {
+ statistic.setP2Timeout(client.getTimeout());
+ }
+
+ TLoadResp loadResp = client.sendLoadCommand(loadCommandReq);
+ if (!loadResp.isAccepted()) {
+ logger.warn(loadResp.message);
+ throw new FragmentInstanceDispatchException(loadResp.status);
+ } else {
+ // if any node in this replica set succeeds, it is loaded
+ return true;
+ }
+ } catch (ClientManagerException | TException e) {
+ logger.debug("{} timed out, retrying...", endPoint, e);
+ }
+
+ Thread.sleep(RETRY_INTERVAL_MS);
+ }
+ return false;
+ }
private Void loadInGroup(
TRegionReplicaSet replicaSet, TLoadCommandReq loadCommandReq,
AtomicBoolean hasTimeout)
throws SocketException {
@@ -201,50 +231,27 @@ public class TsFileSplitSender {
uuid,
dataNodeLocation,
replicaSet.regionId);
- TEndPoint endPoint = dataNodeLocation.getInternalEndPoint();
- boolean loaded = false;
-
- for (int i = 0; i < MAX_RETRY; i++) {
- try (SyncDataNodeInternalServiceClient client =
- internalServiceClientManager.borrowClient(endPoint)) {
- // record timeout for recalculating max batch size
- if (statistic.p2Timeout == 0) {
- statistic.p2Timeout = client.getTimeout();
- }
-
- TLoadResp loadResp = client.sendLoadCommand(loadCommandReq);
- if (!loadResp.isAccepted()) {
- logger.warn(loadResp.message);
- locationException = new
FragmentInstanceDispatchException(loadResp.status);
- } else {
- // if any node in this replica set succeeds, it is loaded
- locationException = null;
- loaded = true;
- }
+ try {
+ if (loadInGroup(dataNodeLocation, loadCommandReq)) {
+ // if any node in this replica set succeeds, it is loaded
+ locationException = null;
break;
- } catch (ClientManagerException | TException e) {
+ } else {
+ // the location timed out
TSStatus status = new TSStatus();
status.setCode(TSStatusCode.DISPATCH_ERROR.getStatusCode());
status.setMessage(
"can't connect to node {}, please reset longer
dn_connection_timeout_ms "
+ "in iotdb-common.properties and restart iotdb."
- + endPoint);
- locationException = new FragmentInstanceDispatchException(status);
+ + dataNodeLocation.internalEndPoint);
hasTimeout.set(true);
+ locationException = new FragmentInstanceDispatchException(status);
}
-
- try {
- Thread.sleep(RETRY_INTERVAL_MS);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- locationException = e;
- break;
- }
- }
-
- if (loaded) {
- // if any node in this replica set succeeds, it is loaded
- break;
+ } catch (FragmentInstanceDispatchException e) {
+ locationException = e;
+ } catch (InterruptedException e) {
+ locationException = e;
+ Thread.currentThread().interrupt();
}
}
@@ -281,22 +288,20 @@ public class TsFileSplitSender {
phaseTwoFailures.put(loadFuture.left.regionId, e);
}
}
- statistic.p2TimeMS = System.currentTimeMillis() - p2StartMS;
- statistic.hasP2Timeout = hasTimeout.get();
+ statistic.setP2TimeMS(System.currentTimeMillis() - p2StartMS);
+ statistic.setHasP2Timeout(hasTimeout.get());
return phaseTwoFailures.isEmpty();
}
public LocationSequencer createLocationSequencer(TRegionReplicaSet
replicaSet) {
- // return new FixedLocationSequencer(replicaSet);
- // return new RandomLocationSequencer(replicaSet);
return new ThroughputBasedLocationSequencer(replicaSet,
locationStatistics);
}
private ByteBuffer compressBuffer(ByteBuffer buffer) throws IOException {
- statistic.rawSize.addAndGet(buffer.remaining());
+ statistic.getRawSize().addAndGet(buffer.remaining());
if (compressionType.equals(CompressionType.UNCOMPRESSED)) {
- statistic.compressedSize.addAndGet(buffer.remaining());
+ statistic.getCompressedSize().addAndGet(buffer.remaining());
return buffer;
}
ICompressor compressor = ICompressor.getCompressor(compressionType);
@@ -309,7 +314,7 @@ public class TsFileSplitSender {
buffer.remaining(),
compressed.array());
compressed.limit(compressLength);
- statistic.compressedSize.addAndGet(compressLength);
+ statistic.getCompressedSize().addAndGet(compressLength);
return compressed;
}
@@ -324,10 +329,10 @@ public class TsFileSplitSender {
subNodes = pair.left.get();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
- logger.error("Unexpected error during splitting node", e);
+ logger.error("Unexpected interruption during splitting node", e);
return false;
} catch (ExecutionException e) {
- logger.error("Unexpected error during splitting node", e);
+ logger.error("Unexpected execution error during splitting node", e);
return false;
}
if (!dispatchPieceNodes(subNodes, pair.right)) {
@@ -397,7 +402,7 @@ public class TsFileSplitSender {
return false;
}
long compressingTime = System.nanoTime() - startTime;
- statistic.compressingTimeNs.addAndGet(compressingTime);
+ statistic.getCompressingTimeNs().addAndGet(compressingTime);
TTsFilePieceReq loadTsFileReq = genLoadReq(buffer, replicaSet,
uncompressedLength);
LocationSequencer locationSequencer = createLocationSequencer(replicaSet);
@@ -451,7 +456,7 @@ public class TsFileSplitSender {
.map(node -> dispatchOneFinalNode(node, replicaSet))
.collect(Collectors.toList());
long elapsedTime = System.nanoTime() - start;
- statistic.dispatchNodesTimeNS.addAndGet(elapsedTime);
+ statistic.getDispatchNodesTimeNS().addAndGet(elapsedTime);
return !subNodeResults.contains(false);
}
@@ -464,7 +469,7 @@ public class TsFileSplitSender {
// split the piece node asynchronously to improve parallelism
if (splitFutures.size() < MAX_PENDING_PIECE_NODE) {
splitFutures.add(new Pair<>(submitSplitPieceNode(pieceNode),
replicaSet));
- statistic.dispatchNodeTimeNS.addAndGet(System.nanoTime() - allStart);
+ statistic.getDispatchNodeTimeNS().addAndGet(System.nanoTime() -
allStart);
return true;
} else {
// wait for the first split task to complete if too many task
@@ -473,71 +478,155 @@ public class TsFileSplitSender {
try {
subNodes = pair.left.get();
long elapsedTime = System.nanoTime() - start;
- statistic.splitTime.addAndGet(elapsedTime);
- statistic.pieceNodeNum.incrementAndGet();
+ statistic.getSplitTime().addAndGet(elapsedTime);
+ statistic.getPieceNodeNum().incrementAndGet();
logger.debug(
"{} splits are generated after {}ms", subNodes.size(), elapsedTime
/ 1_000_000L);
splitFutures.add(new Pair<>(submitSplitPieceNode(pieceNode),
replicaSet));
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
- logger.error("Unexpected error during splitting node", e);
+ logger.error("Unexpected interruption during splitting node", e);
return false;
} catch (ExecutionException e) {
- logger.error("Unexpected error during splitting node", e);
+ logger.error("Unexpected execution error during splitting node", e);
return false;
}
// send the split nodes to the replicas
boolean success = dispatchPieceNodes(subNodes, pair.right);
- statistic.dispatchNodeTimeNS.addAndGet(System.nanoTime() - allStart);
+ statistic.getDispatchNodeTimeNS().addAndGet(System.nanoTime() -
allStart);
return success;
}
}
public static class Statistic {
- public long taskStartTime;
- public long taskEndTime;
- public AtomicLong rawSize = new AtomicLong();
- public AtomicLong compressedSize = new AtomicLong();
- public AtomicLong splitTime = new AtomicLong();
- public AtomicLong pieceNodeNum = new AtomicLong();
- public AtomicLong dispatchNodesTimeNS = new AtomicLong();
- public AtomicLong dispatchNodeTimeNS = new AtomicLong();
- public AtomicLong compressingTimeNs = new AtomicLong();
- public long p1TimeMS;
- public long p2TimeMS;
- public long totalSize;
- public boolean hasP2Timeout;
- public long p2Timeout;
+ private long taskStartTime;
+ private long taskEndTime;
+ private AtomicLong rawSize = new AtomicLong();
+ private AtomicLong compressedSize = new AtomicLong();
+ private AtomicLong splitTime = new AtomicLong();
+ private AtomicLong pieceNodeNum = new AtomicLong();
+ private AtomicLong dispatchNodesTimeNS = new AtomicLong();
+ private AtomicLong dispatchNodeTimeNS = new AtomicLong();
+ private AtomicLong compressingTimeNs = new AtomicLong();
+ private long p1TimeMS;
+ private long p2TimeMS;
+ private long totalSize;
+ private boolean hasP2Timeout;
+ private long p2Timeout;
public void logStatistic() {
logger.info(
"Time consumption: {}ms, totalSize: {}MB",
- taskEndTime - taskStartTime,
- totalSize * 1.0 / MB);
+ getTaskEndTime() - getTaskStartTime(),
+ getTotalSize() * 1.0 / MB);
logger.info(
"Generated {} piece nodes, splitTime: {}, dispatchSplitsTime: {},
dispatchNodeTime: {}",
- pieceNodeNum.get(),
- splitTime.get() / 1_000_000L,
- dispatchNodesTimeNS.get() / 1_000_000L,
- dispatchNodeTimeNS.get() / 1_000_000L);
+ getPieceNodeNum().get(),
+ getSplitTime().get() / 1_000_000L,
+ getDispatchNodesTimeNS().get() / 1_000_000L,
+ getDispatchNodeTimeNS().get() / 1_000_000L);
logger.info(
"Transmission size: {}/{} ({}), compressionTime: {}ms",
- compressedSize.get(),
- rawSize.get(),
- compressedSize.get() * 1.0 / rawSize.get(),
- compressingTimeNs.get() / 1_000_000L);
- logger.info("Sync TsFile time: {}ms ({})", p1TimeMS, p1ThroughputMBPS());
- logger.info("Load command execution time: {}ms ({})", p2TimeMS,
p2ThroughputMBPS());
+ getCompressedSize().get(),
+ getRawSize().get(),
+ getCompressedSize().get() * 1.0 / getRawSize().get(),
+ getCompressingTimeNs().get() / 1_000_000L);
+ logger.info("Sync TsFile time: {}ms ({})", getP1TimeMS(),
p1ThroughputMbps());
+ logger.info("Load command execution time: {}ms ({})", getP2TimeMS(),
p2ThroughputMbps());
+ }
+
+ public double p2ThroughputMbps() {
+ return getTotalSize() * 1.0 / MB / (getP2TimeMS() / 1000.0);
+ }
+
+ public double p1ThroughputMbps() {
+ return getTotalSize() * 1.0 / MB / (getP1TimeMS() / 1000.0);
+ }
+
+ public long getTaskStartTime() {
+ return taskStartTime;
+ }
+
+ public void setTaskStartTime(long taskStartTime) {
+ this.taskStartTime = taskStartTime;
+ }
+
+ public long getTaskEndTime() {
+ return taskEndTime;
+ }
+
+ public void setTaskEndTime(long taskEndTime) {
+ this.taskEndTime = taskEndTime;
+ }
+
+ public AtomicLong getRawSize() {
+ return rawSize;
+ }
+
+ public AtomicLong getCompressedSize() {
+ return compressedSize;
+ }
+
+ public AtomicLong getSplitTime() {
+ return splitTime;
+ }
+
+ public AtomicLong getPieceNodeNum() {
+ return pieceNodeNum;
+ }
+
+ public AtomicLong getDispatchNodesTimeNS() {
+ return dispatchNodesTimeNS;
+ }
+
+ public AtomicLong getDispatchNodeTimeNS() {
+ return dispatchNodeTimeNS;
+ }
+
+ public AtomicLong getCompressingTimeNs() {
+ return compressingTimeNs;
+ }
+
+ public long getP1TimeMS() {
+ return p1TimeMS;
+ }
+
+ public void setP1TimeMS(long p1TimeMS) {
+ this.p1TimeMS = p1TimeMS;
+ }
+
+ public long getP2TimeMS() {
+ return p2TimeMS;
+ }
+
+ public void setP2TimeMS(long p2TimeMS) {
+ this.p2TimeMS = p2TimeMS;
+ }
+
+ public long getTotalSize() {
+ return totalSize;
+ }
+
+ public void setTotalSize(long totalSize) {
+ this.totalSize = totalSize;
+ }
+
+ public boolean isHasP2Timeout() {
+ return hasP2Timeout;
+ }
+
+ public void setHasP2Timeout(boolean hasP2Timeout) {
+ this.hasP2Timeout = hasP2Timeout;
}
- public double p2ThroughputMBPS() {
- return totalSize * 1.0 / MB / (p2TimeMS / 1000.0);
+ public long getP2Timeout() {
+ return p2Timeout;
}
- public double p1ThroughputMBPS() {
- return totalSize * 1.0 / MB / (p1TimeMS / 1000.0);
+ public void setP2Timeout(long p2Timeout) {
+ this.p2Timeout = p2Timeout;
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/locseq/ThroughputBasedLocationSequencer.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/locseq/ThroughputBasedLocationSequencer.java
index ac8254f289f..213cf5bf379 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/locseq/ThroughputBasedLocationSequencer.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/locseq/ThroughputBasedLocationSequencer.java
@@ -133,7 +133,7 @@ public class ThroughputBasedLocationSequencer implements
LocationSequencer {
.map(p -> new Pair<>(p.left.getDataNodeId(), p.right))
.collect(Collectors.toList()));
}
- Pair<TDataNodeLocation, Double> chosenPair = locations.remove(chosen);
+ final Pair<TDataNodeLocation, Double> chosenPair =
locations.remove(chosen);
// update ranks
double newTotalRank = 0.0;
for (Pair<TDataNodeLocation, Double> location : locations) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/nodesplit/ClusteringMeasurementSplitter.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/nodesplit/ClusteringMeasurementSplitter.java
index 9a4fdf3b96f..7d24273023e 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/nodesplit/ClusteringMeasurementSplitter.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/nodesplit/ClusteringMeasurementSplitter.java
@@ -184,22 +184,16 @@ public class ClusteringMeasurementSplitter implements
PieceNodeSplitter {
newNodeA.addTsFileData(tsFileData);
} else {
ChunkData chunkData = (ChunkData) tsFileData;
- if (currMeasurement == null ||
currMeasurement.equals(chunkData.firstMeasurement())) {
+ if (currMeasurement == null ||
currMeasurement.equals(chunkData.firstMeasurement()) ||
+ sizeTarget > 0) {
// the first chunk or chunk of the same series, add it to A
+ // or the chunk of the next series and splitA is not
currMeasurement = chunkData.firstMeasurement();
newNodeA.addTsFileData(tsFileData);
sizeTarget -= chunkData.getDataSize();
- } else {
- // chunk of the next series
- if (sizeTarget < 0) {
- // splitA is full, break to fill splitB
- break;
- } else {
- // splitA is not full, also add this series to A
- currMeasurement = chunkData.firstMeasurement();
- newNodeA.addTsFileData(tsFileData);
- sizeTarget -= chunkData.getDataSize();
- }
+ } else if (sizeTarget < 0) {
+ // a new series but splitA is full, break to fill splitB
+ break;
}
}
}
@@ -455,7 +449,7 @@ public class ClusteringMeasurementSplitter implements
PieceNodeSplitter {
public class KMeans implements Clustering {
- private int k;
+ private int clusterNum;
private int maxIteration;
private double[][] centroids;
private AtomicInteger[] centroidCounters;
@@ -463,11 +457,11 @@ public class ClusteringMeasurementSplitter implements
PieceNodeSplitter {
private Map<Entry<String, double[]>, Integer> recordCentroidMapping;
private int vecLength = 0;
- public KMeans(int k, int maxIteration) {
- this.k = k;
+ public KMeans(int clusterNum, int maxIteration) {
+ this.clusterNum = clusterNum;
this.maxIteration = maxIteration;
- this.centroids = new double[k][];
- this.centroidCounters = new AtomicInteger[k];
+ this.centroids = new double[clusterNum][];
+ this.centroidCounters = new AtomicInteger[clusterNum];
for (int i = 0; i < centroidCounters.length; i++) {
centroidCounters[i] = new AtomicInteger();
}
@@ -484,26 +478,24 @@ public class ClusteringMeasurementSplitter implements
PieceNodeSplitter {
@Override
public List<List<String>> cluster(Map<String, double[]> tagVectorMap,
VectorDistance distance) {
recordCentroidMapping.clear();
- if (k > tagVectorMap.size()) {
- k = tagVectorMap.size();
- this.centroids = new double[k][];
+ if (clusterNum > tagVectorMap.size()) {
+ clusterNum = tagVectorMap.size();
+ this.centroids = new double[clusterNum][];
}
for (Entry<String, double[]> entry : tagVectorMap.entrySet()) {
vecLength = entry.getValue().length;
}
- randomCentroid(vecLength, tagVectorMap);
+ randomCentroid(tagVectorMap);
- for (int i = 0; i < maxIteration; i++) {
+ for (int i = 0; i < maxIteration && System.currentTimeMillis() -
splitStartTime <= splitTimeBudget; i++) {
if (!assignCentroid(tagVectorMap, distance)) {
+ // centroid not updated, end
break;
}
newCentroid();
clearCentroidCounter();
- if (System.currentTimeMillis() - splitStartTime > splitTimeBudget) {
- break;
- }
}
Map<Integer, List<Entry<String, double[]>>> centroidRecordMap =
@@ -542,9 +534,9 @@ public class ClusteringMeasurementSplitter implements
PieceNodeSplitter {
List<Entry<String, double[]>> records = e.getValue();
int recordNum = records.size();
double[] sumVec = new double[vecLength];
- for (Entry<String, double[]> record : records) {
+ for (Entry<String, double[]> rec : records) {
for (int i = 0; i < sumVec.length; i++) {
- sumVec[i] += record.getValue()[i];
+ sumVec[i] += rec.getValue()[i];
}
}
for (int i = 0; i < sumVec.length; i++) {
@@ -590,27 +582,16 @@ public class ClusteringMeasurementSplitter implements
PieceNodeSplitter {
return centroidUpdated.get();
}
- private void randomCentroid(int vecLength, Map<String, double[]>
tagVectorMap) {
+ private void randomCentroid(Map<String, double[]> tagVectorMap) {
pickRandomCentroid(tagVectorMap);
- // genRandomCentroid(vecLength);
}
private void pickRandomCentroid(Map<String, double[]> tagVectorMap) {
List<double[]> recordVectors = new ArrayList<>(tagVectorMap.values());
Collections.shuffle(recordVectors);
- for (int i = 0; i < k; i++) {
+ for (int i = 0; i < clusterNum; i++) {
centroids[i] = recordVectors.get(i);
}
}
-
- private void genRandomCentroid(int vecLength) {
- for (int i = 0; i < k; i++) {
- double[] centroid = new double[vecLength];
- for (int j = 0; j < vecLength; j++) {
- centroid[j] = random.nextDouble();
- }
- centroids[i] = centroid;
- }
- }
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/partition/BasicPartitionFetcher.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/partition/BasicPartitionFetcher.java
index 98e50f2720b..6f7da416cb9 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/partition/BasicPartitionFetcher.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/partition/BasicPartitionFetcher.java
@@ -60,6 +60,9 @@ import java.util.Set;
public abstract class BasicPartitionFetcher implements IPartitionFetcher {
+ private static final String ERR_GET_DATA_PARTITION = "An error occurred when
executing getDataPartition():";
+ private static final String ERR_GET_OR_CREATE_DATA_PARTITION = "An error
occurred when executing getOrCreateDataPartition():";
+
protected static final IoTDBConfig config =
IoTDBDescriptor.getInstance().getConfig();
protected final SeriesPartitionExecutor partitionExecutor;
@@ -92,7 +95,7 @@ public abstract class BasicPartitionFetcher implements
IPartitionFetcher {
partitionCache.updateSchemaPartitionCache(
schemaPartitionTableResp.getSchemaPartitionTable());
} else {
- throw new RuntimeException(
+ throw new IllegalStateException(
new IoTDBException(
schemaPartitionTableResp.getStatus().getMessage(),
schemaPartitionTableResp.getStatus().getCode()));
@@ -122,7 +125,7 @@ public abstract class BasicPartitionFetcher implements
IPartitionFetcher {
partitionCache.updateSchemaPartitionCache(
schemaPartitionTableResp.getSchemaPartitionTable());
} else {
- throw new RuntimeException(
+ throw new IllegalStateException(
new IoTDBException(
schemaPartitionTableResp.getStatus().getMessage(),
schemaPartitionTableResp.getStatus().getCode()));
@@ -165,12 +168,12 @@ public abstract class BasicPartitionFetcher implements
IPartitionFetcher {
partitionCache.updateDataPartitionCache(dataPartitionTableResp.getDataPartitionTable());
} else {
throw new StatementAnalyzeException(
- "An error occurred when executing getDataPartition():"
+ ERR_GET_DATA_PARTITION
+ dataPartitionTableResp.getStatus().getMessage());
}
} catch (ClientManagerException | TException e) {
throw new StatementAnalyzeException(
- "An error occurred when executing getDataPartition():" +
e.getMessage());
+ ERR_GET_DATA_PARTITION + e.getMessage());
}
}
return dataPartition;
@@ -190,12 +193,12 @@ public abstract class BasicPartitionFetcher implements
IPartitionFetcher {
return parseDataPartitionResp(dataPartitionTableResp);
} else {
throw new StatementAnalyzeException(
- "An error occurred when executing getDataPartition():"
+ ERR_GET_DATA_PARTITION
+ dataPartitionTableResp.getStatus().getMessage());
}
} catch (ClientManagerException | TException e) {
throw new StatementAnalyzeException(
- "An error occurred when executing getDataPartition():" +
e.getMessage());
+ ERR_GET_DATA_PARTITION + e.getMessage());
}
}
@@ -214,12 +217,12 @@ public abstract class BasicPartitionFetcher implements
IPartitionFetcher {
partitionCache.updateDataPartitionCache(dataPartitionTableResp.getDataPartitionTable());
} else {
throw new StatementAnalyzeException(
- "An error occurred when executing getOrCreateDataPartition():"
+ ERR_GET_OR_CREATE_DATA_PARTITION
+ dataPartitionTableResp.getStatus().getMessage());
}
} catch (ClientManagerException | TException e) {
throw new StatementAnalyzeException(
- "An error occurred when executing getOrCreateDataPartition():" +
e.getMessage());
+ ERR_GET_OR_CREATE_DATA_PARTITION + e.getMessage());
}
}
return dataPartition;
@@ -243,14 +246,14 @@ public abstract class BasicPartitionFetcher implements
IPartitionFetcher {
dataPartition = parseDataPartitionResp(dataPartitionTableResp);
partitionCache.updateDataPartitionCache(dataPartitionTableResp.getDataPartitionTable());
} else {
- throw new RuntimeException(
+ throw new IllegalStateException(
new IoTDBException(
dataPartitionTableResp.getStatus().getMessage(),
dataPartitionTableResp.getStatus().getCode()));
}
} catch (ClientManagerException | TException e) {
throw new StatementAnalyzeException(
- "An error occurred when executing getOrCreateDataPartition():" +
e.getMessage());
+ ERR_GET_OR_CREATE_DATA_PARTITION + e.getMessage());
}
}
return dataPartition;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/partition/ClusterPartitionFetcher.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/partition/ClusterPartitionFetcher.java
index b38346c150c..c7f6a7bde03 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/partition/ClusterPartitionFetcher.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/partition/ClusterPartitionFetcher.java
@@ -27,6 +27,7 @@ import
org.apache.iotdb.db.protocol.client.ConfigNodeClientManager;
import org.apache.iotdb.db.protocol.client.ConfigNodeInfo;
import
org.apache.iotdb.db.queryengine.plan.analyze.cache.partition.PartitionCache;
+@SuppressWarnings("java:S6548")
public class ClusterPartitionFetcher extends BasicPartitionFetcher {
private final IClientManager<ConfigRegionId, ConfigNodeClient>
configNodeClientManager =
ConfigNodeClientManager.getInstance();
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/SchemaValidator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/SchemaValidator.java
index 78bf3023943..81b295eb066 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/SchemaValidator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/SchemaValidator.java
@@ -55,6 +55,7 @@ public class SchemaValidator {
}
}
+ @SuppressWarnings("java:S107")
public static ISchemaTree validate(
ISchemaFetcher schemaFetcher,
List<PartialPath> devicePaths,
@@ -71,22 +72,66 @@ public class SchemaValidator {
public static ISchemaTree validate(
ISchemaFetcher schemaFetcher, ValidatingSchema validatingSchema,
MPPQueryContext context) {
return schemaFetcher.fetchSchemaListWithAutoCreate(
- validatingSchema.devicePaths,
- validatingSchema.measurements,
- validatingSchema.dataTypes,
- validatingSchema.encodings,
- validatingSchema.compressionTypes,
- validatingSchema.isAlignedList,
+ validatingSchema.getDevicePaths(),
+ validatingSchema.getMeasurements(),
+ validatingSchema.getDataTypes(),
+ validatingSchema.getEncodings(),
+ validatingSchema.getCompressionTypes(),
+ validatingSchema.getIsAlignedList(),
context);
}
public static class ValidatingSchema {
- public List<PartialPath> devicePaths = new ArrayList<>();
- public List<String[]> measurements = new ArrayList<>();
- public List<TSDataType[]> dataTypes = new ArrayList<>();
- public List<TSEncoding[]> encodings = new ArrayList<>();
- public List<CompressionType[]> compressionTypes = new ArrayList<>();
- public List<Boolean> isAlignedList = new ArrayList<>();
+ private List<PartialPath> devicePaths = new ArrayList<>();
+ private List<String[]> measurements = new ArrayList<>();
+ private List<TSDataType[]> dataTypes = new ArrayList<>();
+ private List<TSEncoding[]> encodings = new ArrayList<>();
+ private List<CompressionType[]> compressionTypes = new ArrayList<>();
+ private List<Boolean> isAlignedList = new ArrayList<>();
+
+ public List<PartialPath> getDevicePaths() {
+ return devicePaths;
+ }
+
+ public List<String[]> getMeasurements() {
+ return measurements;
+ }
+
+ public void setMeasurements(List<String[]> measurements) {
+ this.measurements = measurements;
+ }
+
+ public List<TSDataType[]> getDataTypes() {
+ return dataTypes;
+ }
+
+ public void setDataTypes(
+ List<TSDataType[]> dataTypes) {
+ this.dataTypes = dataTypes;
+ }
+
+ public List<TSEncoding[]> getEncodings() {
+ return encodings;
+ }
+
+ public void setEncodings(
+ List<TSEncoding[]> encodings) {
+ this.encodings = encodings;
+ }
+
+ public List<CompressionType[]> getCompressionTypes() {
+ return compressionTypes;
+ }
+
+ public void setCompressionTypes(
+ List<CompressionType[]> compressionTypes) {
+ this.compressionTypes = compressionTypes;
+ }
+
+ public List<Boolean> getIsAlignedList() {
+ return isAlignedList;
+ }
+
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadCommandNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadCommandNode.java
index 3d42a401b70..b39a174d99c 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadCommandNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadCommandNode.java
@@ -62,11 +62,14 @@ public class LoadCommandNode extends PlanNode {
}
@Override
- public void addChild(PlanNode child) {}
+ public void addChild(PlanNode child) {
+ // no children for this plan
+ }
+ @SuppressWarnings({"java:S2975", "java:S1182"})
@Override
public PlanNode clone() {
- return null;
+ return new LoadCommandNode(getPlanNodeId(), loadCommand, uuid,
consensusGroupId, isGeneratedByPipe);
}
@Override
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java
index a759829024b..32a83c0761b 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java
@@ -101,6 +101,7 @@ import static
org.apache.iotdb.commons.conf.IoTDBConstant.FILE_NAME_SEPARATOR;
public class StorageEngine implements IService {
+ private static final String MSG_NO_SUCH_UUID = "No load TsFile uuid %s
recorded for execute load command %s.";
private static final Logger logger =
LoggerFactory.getLogger(StorageEngine.class);
private static final IoTDBConfig config =
IoTDBDescriptor.getInstance().getConfig();
@@ -807,7 +808,7 @@ public class StorageEngine implements IService {
status.setCode(TSStatusCode.LOAD_FILE_ERROR.getStatusCode());
status.setMessage(
String.format(
- "No load TsFile uuid %s recorded for execute load command
%s.",
+ MSG_NO_SUCH_UUID,
loadCommandNode.getUuid(),
loadCommandNode.getLoadCommand()));
}
break;
@@ -819,7 +820,7 @@ public class StorageEngine implements IService {
status.setCode(TSStatusCode.LOAD_FILE_ERROR.getStatusCode());
status.setMessage(
String.format(
- "No load TsFile uuid %s recorded for execute load command
%s.",
+ MSG_NO_SUCH_UUID,
loadCommandNode.getUuid(),
loadCommandNode.getLoadCommand()));
}
break;
@@ -851,7 +852,7 @@ public class StorageEngine implements IService {
status.setCode(TSStatusCode.LOAD_FILE_ERROR.getStatusCode());
status.setMessage(
String.format(
- "No load TsFile uuid %s recorded for execute load command
%s.",
+ MSG_NO_SUCH_UUID,
uuid, loadCommand));
}
break;
@@ -862,7 +863,7 @@ public class StorageEngine implements IService {
status.setCode(TSStatusCode.LOAD_FILE_ERROR.getStatusCode());
status.setMessage(
String.format(
- "No load TsFile uuid %s recorded for execute load command
%s.",
+ MSG_NO_SUCH_UUID,
uuid, loadCommand));
}
break;
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/load/LoadTsFileManagerTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/load/LoadTsFileManagerTest.java
index 26b7745a451..38084a86c6d 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/load/LoadTsFileManagerTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/load/LoadTsFileManagerTest.java
@@ -136,7 +136,7 @@ public class LoadTsFileManagerTest extends TestBase {
}
}
- public TLoadResp handleTsFilePieceNode(TTsFilePieceReq req, TEndPoint
tEndpoint)
+ public TLoadResp handleTsFilePieceNode(TTsFilePieceReq req, TEndPoint
tEndPoint)
throws TException, IOException {
ConsensusGroupId groupId =
ConsensusGroupId.Factory.createFromTConsensusGroupId(req.consensusGroupId);
@@ -165,7 +165,7 @@ public class LoadTsFileManagerTest extends TestBase {
.forEach(
dataNodeLocation -> {
TEndPoint otherPoint = dataNodeLocation.getInternalEndPoint();
- if (!otherPoint.equals(tEndpoint)) {
+ if (!otherPoint.equals(tEndPoint)) {
try {
handleTsFilePieceNode(req, otherPoint);
} catch (TException | IOException e) {
@@ -180,7 +180,7 @@ public class LoadTsFileManagerTest extends TestBase {
.setStatus(new
TSStatus().setCode(TSStatusCode.SUCCESS_STATUS.getStatusCode()));
}
- public TLoadResp handleTsLoadCommand(TLoadCommandReq req, TEndPoint
tEndpoint)
+ public TLoadResp handleTsLoadCommand(TLoadCommandReq req, TEndPoint
tEndPoint)
throws LoadFileException, IOException {
ConsensusGroupId groupId =
ConsensusGroupId.Factory.createFromTConsensusGroupId(req.consensusGroupId);
@@ -191,7 +191,7 @@ public class LoadTsFileManagerTest extends TestBase {
TRegionReplicaSet regionReplicaSet = groupId2ReplicaSetMap.get(groupId);
for (TDataNodeLocation dataNodeLocation :
regionReplicaSet.getDataNodeLocations()) {
TEndPoint otherPoint = dataNodeLocation.getInternalEndPoint();
- if (!otherPoint.equals(tEndpoint)) {
+ if (!otherPoint.equals(tEndPoint)) {
handleTsLoadCommand(req, otherPoint);
}
}
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/load/LoadTsFileSchedulerTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/load/LoadTsFileSchedulerTest.java
index 9e226eb2dca..9ae155d18c0 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/load/LoadTsFileSchedulerTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/load/LoadTsFileSchedulerTest.java
@@ -109,7 +109,7 @@ public class LoadTsFileSchedulerTest extends TestBase {
System.out.printf("Split ends after %dms", timeConsumption);
}
- public TLoadResp handleTsFilePieceNode(TTsFilePieceReq req, TEndPoint
tEndpoint) {
+ public TLoadResp handleTsFilePieceNode(TTsFilePieceReq req, TEndPoint
tEndPoint) {
ConsensusGroupId groupId =
ConsensusGroupId.Factory.createFromTConsensusGroupId(req.consensusGroupId);
LoadTsFilePieceNode pieceNode =
@@ -117,7 +117,7 @@ public class LoadTsFileSchedulerTest extends TestBase {
Set<Integer> splitIds =
phaseOneResults
.computeIfAbsent(
- tEndpoint,
+ tEndPoint,
e -> new
ConcurrentSkipListMap<>(Comparator.comparingInt(ConsensusGroupId::getId)))
.computeIfAbsent(groupId, g -> new ConcurrentSkipListMap<>())
.computeIfAbsent(req.uuid, id -> new ConcurrentSkipListMap<>())
@@ -132,9 +132,9 @@ public class LoadTsFileSchedulerTest extends TestBase {
.setStatus(new
TSStatus().setCode(TSStatusCode.SUCCESS_STATUS.getStatusCode()));
}
- public TLoadResp handleTsLoadCommand(TLoadCommandReq req, TEndPoint
tEndpoint) {
+ public TLoadResp handleTsLoadCommand(TLoadCommandReq req, TEndPoint
tEndPoint) {
phaseTwoResults
- .computeIfAbsent(tEndpoint, e -> new ConcurrentSkipListMap<>())
+ .computeIfAbsent(tEndPoint, e -> new ConcurrentSkipListMap<>())
.computeIfAbsent(req.uuid, id -> req.commandType);
return new TLoadResp()
@@ -145,16 +145,16 @@ public class LoadTsFileSchedulerTest extends TestBase {
public void printPhaseResult() {
System.out.print("Phase one:\n");
for (Entry<TEndPoint, Map<ConsensusGroupId, Map<String, Map<File,
Set<Integer>>>>>
- tEndPointMapEntry : phaseOneResults.entrySet()) {
- TEndPoint endPoint = tEndPointMapEntry.getKey();
+ entry : phaseOneResults.entrySet()) {
+ TEndPoint endPoint = entry.getKey();
for (Entry<ConsensusGroupId, Map<String, Map<File, Set<Integer>>>>
consensusGroupIdMapEntry :
- tEndPointMapEntry.getValue().entrySet()) {
+ entry.getValue().entrySet()) {
ConsensusGroupId consensusGroupId = consensusGroupIdMapEntry.getKey();
int chunkNum = 0;
int fileNum = 0;
int taskNum = 0;
for (Entry<String, Map<File, Set<Integer>>> stringMapEntry :
- consensusGroupIdMapEntry.getValue().entrySet()) {;
+ consensusGroupIdMapEntry.getValue().entrySet()) {
taskNum += 1;
for (Entry<File, Set<Integer>> fileListEntry :
stringMapEntry.getValue().entrySet()) {
Set<Integer> chunks = fileListEntry.getValue();
@@ -165,20 +165,13 @@ public class LoadTsFileSchedulerTest extends TestBase {
System.out.printf(
"%s - %s - %s tasks - %s files - %s chunks\n",
endPoint, consensusGroupId, taskNum, fileNum, chunkNum);
- // if (consensusGroupId.getId() == 0) {
- // // d1, non-aligned series
- // assertEquals(expectedChunkNum() / 2, chunkNum);
- // } else {
- // // d2, aligned series
- // assertEquals(expectedChunkNum() / 2 / seriesNum, chunkNum);
- // }
}
}
System.out.print("Phase two:\n");
- for (Entry<TEndPoint, Map<String, Integer>> tEndPointMapEntry :
phaseTwoResults.entrySet()) {
- TEndPoint endPoint = tEndPointMapEntry.getKey();
- for (Entry<String, Integer> stringMapEntry :
tEndPointMapEntry.getValue().entrySet()) {
+ for (Entry<TEndPoint, Map<String, Integer>> entry :
phaseTwoResults.entrySet()) {
+ TEndPoint endPoint = entry.getKey();
+ for (Entry<String, Integer> stringMapEntry :
entry.getValue().entrySet()) {
String uuid = stringMapEntry.getKey();
int command = stringMapEntry.getValue();
System.out.printf("%s - %s - %s\n", endPoint, uuid,
LoadCommand.values()[command]);
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/load/TestBase.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/load/TestBase.java
index 3ef4c756386..ea8d526ba5a 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/load/TestBase.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/load/TestBase.java
@@ -164,14 +164,14 @@ public class TestBase {
return (splitChunkNum + fileNum) * seriesNum * deviceNum;
}
- public TLoadResp handleTsFilePieceNode(TTsFilePieceReq req, TEndPoint
tEndpoint)
+ public TLoadResp handleTsFilePieceNode(TTsFilePieceReq req, TEndPoint
tEndPoint)
throws TException, IOException {
return new TLoadResp()
.setAccepted(true)
.setStatus(new
TSStatus().setCode(TSStatusCode.SUCCESS_STATUS.getStatusCode()));
}
- public TLoadResp handleTsLoadCommand(TLoadCommandReq req, TEndPoint
tEndpoint)
+ public TLoadResp handleTsLoadCommand(TLoadCommandReq req, TEndPoint
tEndPoint)
throws LoadFileException, IOException {
return new TLoadResp()
.setAccepted(true)
@@ -420,12 +420,12 @@ public class TestBase {
public static String getTestTsFilePath(
String logicalStorageGroupName,
- long VirtualStorageGroupId,
- long TimePartitionId,
+ long virtualStorageGroupId,
+ long timePartitionId,
long tsFileVersion) {
String filePath =
String.format(
- TEST_TSFILE_PATH, logicalStorageGroupName, VirtualStorageGroupId,
TimePartitionId);
+ TEST_TSFILE_PATH, logicalStorageGroupName, virtualStorageGroupId,
timePartitionId);
return TsFileGeneratorUtils.getTsFilePath(filePath, tsFileVersion);
}
}
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/load/TsFileSplitSenderTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/load/TsFileSplitSenderTest.java
index bd903c45548..a682263e742 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/load/TsFileSplitSenderTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/load/TsFileSplitSenderTest.java
@@ -60,6 +60,7 @@ import java.util.stream.Collectors;
import static org.apache.iotdb.commons.conf.IoTDBConstant.MB;
import static org.junit.Assert.assertEquals;
+@SuppressWarnings("java:S2925")
public class TsFileSplitSenderTest extends TestBase {
private static final Logger logger =
LoggerFactory.getLogger(TsFileSplitSenderTest.class);
@@ -128,7 +129,7 @@ public class TsFileSplitSenderTest extends TestBase {
thread.interrupt();
printPhaseResult();
- long transmissionTime = splitSender.getStatistic().compressedSize.get() /
nodeThroughput;
+ long transmissionTime =
splitSender.getStatistic().getCompressedSize().get() / nodeThroughput;
System.out.printf(
"Split ends after %dms + %dms (Transmission) = %dms\n",
timeConsumption, transmissionTime, timeConsumption + transmissionTime);
@@ -139,30 +140,30 @@ public class TsFileSplitSenderTest extends TestBase {
System.out.printf("Memory usage %dMB\n", maxMemoryUsage.get() / MB);
}
- public TLoadResp handleTsFilePieceNode(TTsFilePieceReq req, TEndPoint
tEndpoint)
+ public TLoadResp handleTsFilePieceNode(TTsFilePieceReq req, TEndPoint
tEndPoint)
throws TException, IOException {
- long handleStart = System.nanoTime();
- if ((tEndpoint.getPort() - 10000) % 3 == 0
+ final long handleStart = System.nanoTime();
+ if ((tEndPoint.getPort() - 10000) % 3 == 0
&& random.nextDouble() < packetLossRatio
&& req.relayTargets != null) {
throw new TException("Packet lost");
}
- if ((tEndpoint.getPort() - 10000) % 3 == 1
+ if ((tEndPoint.getPort() - 10000) % 3 == 1
&& random.nextDouble() < packetLossRatio / 2
&& req.relayTargets != null) {
throw new TException("Packet lost");
}
- if ((tEndpoint.getPort() - 10000) % 3 == 0 && req.relayTargets != null &&
stuckDurationMS > 0) {
+ if ((tEndPoint.getPort() - 10000) % 3 == 0 && req.relayTargets != null &&
stuckDurationMS > 0) {
Pair<Long, Long> nextStuckTime =
nextStuckTimeMap.computeIfAbsent(
- tEndpoint,
+ tEndPoint,
e ->
new Pair<>(
System.currentTimeMillis(), System.currentTimeMillis() +
stuckDurationMS));
long currTime = System.currentTimeMillis();
if (currTime >= nextStuckTime.left && currTime < nextStuckTime.right) {
- logger.debug("Node{} stalls", tEndpoint.getPort() - 10000);
+ logger.debug("Node{} stalls", tEndPoint.getPort() - 10000);
try {
Thread.sleep(nextStuckTime.right - currTime);
} catch (InterruptedException e) {
@@ -170,7 +171,7 @@ public class TsFileSplitSenderTest extends TestBase {
}
} else if (currTime > nextStuckTime.right) {
nextStuckTimeMap.compute(
- tEndpoint,
+ tEndPoint,
(endPoint, newInterval) -> {
if (newInterval != null && currTime < newInterval.right) {
return newInterval;
@@ -185,8 +186,6 @@ public class TsFileSplitSenderTest extends TestBase {
}
long decompressStart = System.nanoTime();
- ConsensusGroupId groupId =
-
ConsensusGroupId.Factory.createFromTConsensusGroupId(req.consensusGroupId);
ByteBuffer buf = req.body.slice();
if (req.isSetCompressionType()) {
CompressionType compressionType =
CompressionType.deserialize(req.compressionType);
@@ -203,10 +202,13 @@ public class TsFileSplitSenderTest extends TestBase {
long deserializeStart = System.nanoTime();
LoadTsFilePieceNode pieceNode = (LoadTsFilePieceNode)
PlanNodeType.deserialize(buf);
deserializeTime.addAndGet(System.nanoTime() - deserializeStart);
+
+ ConsensusGroupId groupId =
+
ConsensusGroupId.Factory.createFromTConsensusGroupId(req.consensusGroupId);
Set<Integer> splitIds =
phaseOneResults
.computeIfAbsent(
- tEndpoint,
+ tEndPoint,
e -> new
ConcurrentSkipListMap<>(Comparator.comparingInt(ConsensusGroupId::getId)))
.computeIfAbsent(groupId, g -> new ConcurrentSkipListMap<>())
.computeIfAbsent(req.uuid, id -> new ConcurrentSkipListMap<>())
@@ -217,14 +219,14 @@ public class TsFileSplitSenderTest extends TestBase {
.collect(Collectors.toList()));
if (dummyDelayMS > 0) {
- if ((tEndpoint.getPort() - 10000) % 3 == 0 && req.relayTargets != null) {
+ if ((tEndPoint.getPort() - 10000) % 3 == 0 && req.relayTargets != null) {
try {
Thread.sleep(dummyDelayMS);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
- if ((tEndpoint.getPort() - 10000) % 3 == 1 && req.relayTargets != null) {
+ if ((tEndPoint.getPort() - 10000) % 3 == 1 && req.relayTargets != null) {
try {
Thread.sleep(dummyDelayMS / 2);
} catch (InterruptedException e) {
@@ -243,7 +245,7 @@ public class TsFileSplitSenderTest extends TestBase {
.forEach(
dataNodeLocation -> {
TEndPoint otherPoint = dataNodeLocation.getInternalEndPoint();
- if (!otherPoint.equals(tEndpoint)) {
+ if (!otherPoint.equals(tEndPoint)) {
try {
handleTsFilePieceNode(req, otherPoint);
} catch (TException | IOException e) {
@@ -260,12 +262,12 @@ public class TsFileSplitSenderTest extends TestBase {
.setStatus(new
TSStatus().setCode(TSStatusCode.SUCCESS_STATUS.getStatusCode()));
}
- public TLoadResp handleTsLoadCommand(TLoadCommandReq req, TEndPoint
tEndpoint) {
+ public TLoadResp handleTsLoadCommand(TLoadCommandReq req, TEndPoint
tEndPoint) {
ConsensusGroupId groupId =
ConsensusGroupId.Factory.createFromTConsensusGroupId(req.consensusGroupId);
phaseTwoResults
.computeIfAbsent(
- tEndpoint,
+ tEndPoint,
e -> new
ConcurrentSkipListMap<>(Comparator.comparingInt(ConsensusGroupId::getId)))
.computeIfAbsent(groupId, g -> new ConcurrentSkipListMap<>())
.computeIfAbsent(req.uuid, id -> req.commandType);
@@ -276,7 +278,7 @@ public class TsFileSplitSenderTest extends TestBase {
TRegionReplicaSet regionReplicaSet = groupId2ReplicaSetMap.get(groupId);
for (TDataNodeLocation dataNodeLocation :
regionReplicaSet.getDataNodeLocations()) {
TEndPoint otherPoint = dataNodeLocation.getInternalEndPoint();
- if (!otherPoint.equals(tEndpoint)) {
+ if (!otherPoint.equals(tEndPoint)) {
handleTsLoadCommand(req, otherPoint);
}
}
@@ -290,10 +292,10 @@ public class TsFileSplitSenderTest extends TestBase {
public void printPhaseResult() {
System.out.print("Phase one:\n");
for (Entry<TEndPoint, Map<ConsensusGroupId, Map<String, Map<File,
Set<Integer>>>>>
- tEndPointMapEntry : phaseOneResults.entrySet()) {
- TEndPoint endPoint = tEndPointMapEntry.getKey();
+ endPointMapEntry : phaseOneResults.entrySet()) {
+ TEndPoint endPoint = endPointMapEntry.getKey();
for (Entry<ConsensusGroupId, Map<String, Map<File, Set<Integer>>>>
consensusGroupIdMapEntry :
- tEndPointMapEntry.getValue().entrySet()) {
+ endPointMapEntry.getValue().entrySet()) {
ConsensusGroupId consensusGroupId = consensusGroupIdMapEntry.getKey();
for (Entry<String, Map<File, Set<Integer>>> stringMapEntry :
consensusGroupIdMapEntry.getValue().entrySet()) {
@@ -317,11 +319,11 @@ public class TsFileSplitSenderTest extends TestBase {
}
System.out.print("Phase two:\n");
- for (Entry<TEndPoint, Map<ConsensusGroupId, Map<String, Integer>>>
tEndPointMapEntry :
+ for (Entry<TEndPoint, Map<ConsensusGroupId, Map<String, Integer>>>
endPointMapEntryValue :
phaseTwoResults.entrySet()) {
- TEndPoint endPoint = tEndPointMapEntry.getKey();
+ TEndPoint endPoint = endPointMapEntryValue.getKey();
for (Entry<ConsensusGroupId, Map<String, Integer>>
consensusGroupIdMapEntry :
- tEndPointMapEntry.getValue().entrySet()) {
+ endPointMapEntryValue.getValue().entrySet()) {
ConsensusGroupId consensusGroupId = consensusGroupIdMapEntry.getKey();
for (Entry<String, Integer> stringMapEntry :
consensusGroupIdMapEntry.getValue().entrySet()) {
diff --git
a/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/TsFileUtils.java
b/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/TsFileUtils.java
index c54028d513c..c86d597a734 100644
---
a/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/TsFileUtils.java
+++
b/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/TsFileUtils.java
@@ -69,7 +69,7 @@ public class TsFileUtils {
throws IOException {
if (uncompressedSize == 0 || type == CompressionType.UNCOMPRESSED) {
return buffer;
- } // FIXME if the buffer is not array-implemented.
+ }
IUnCompressor unCompressor = IUnCompressor.getUnCompressor(type);
ByteBuffer uncompressedBuffer = ByteBuffer.allocate(uncompressedSize);
unCompressor.uncompress(