This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 32e30b3217a Tsfile load: fix bugs & smells reported by SonarCloud
(#10332)
32e30b3217a is described below
commit 32e30b3217aed468a200433b9a79a78d6f05f8e1
Author: Caideyipi <[email protected]>
AuthorDate: Thu Jun 29 11:04:43 2023 +0800
Tsfile load: fix bugs & smells reported by SonarCloud (#10332)
---
.../execution/load/LoadTsFileManager.java | 8 ++-
.../queryengine/execution/load/TsFileSplitter.java | 3 +-
.../plan/node/load/LoadSingleTsFileNode.java | 39 +++++++++--
.../planner/plan/node/load/LoadTsFileNode.java | 39 ++++++++---
.../plan/node/load/LoadTsFilePieceNode.java | 29 ++++++++-
.../scheduler/load/LoadTsFileDispatcherImpl.java | 76 ++++++++++++----------
.../plan/scheduler/load/LoadTsFileScheduler.java | 60 +++++++++--------
7 files changed, 171 insertions(+), 83 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/LoadTsFileManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/LoadTsFileManager.java
index f26529df255..d2199f77cc9 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/LoadTsFileManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/LoadTsFileManager.java
@@ -307,8 +307,12 @@ public class LoadTsFileManager {
@Override
public boolean equals(Object o) {
- if (this == o) return true;
- if (o == null || getClass() != o.getClass()) return false;
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
DataPartitionInfo that = (DataPartitionInfo) o;
return Objects.equals(dataRegion, that.dataRegion)
&& timePartitionSlot.getStartTime() ==
that.timePartitionSlot.getStartTime();
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/TsFileSplitter.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/TsFileSplitter.java
index 34b19ad2c6c..721360ebbee 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/TsFileSplitter.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/TsFileSplitter.java
@@ -252,7 +252,6 @@ public class TsFileSplitter {
reader.readPageHeader(
header.getDataType(),
(header.getChunkType() & 0x3F) ==
MetaMarker.CHUNK_HEADER);
- long pageDataSize = pageHeader.getSerializedPageSize();
List<AlignedChunkData> alignedChunkDataList =
pageIndex2ChunkData.get(pageIndex);
for (AlignedChunkData alignedChunkData : alignedChunkDataList) {
if (!allChunkData.contains(alignedChunkData)) {
@@ -272,7 +271,7 @@ public class TsFileSplitter {
alignedChunkData.writeDecodeValuePage(times, values,
header.getDataType());
}
}
-
+ long pageDataSize = pageHeader.getSerializedPageSize();
pageIndex += 1;
dataSize -= pageDataSize;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadSingleTsFileNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadSingleTsFileNode.java
index 199d05690b2..d536a357412 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadSingleTsFileNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadSingleTsFileNode.java
@@ -43,8 +43,10 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.file.Files;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashSet;
import java.util.List;
+import java.util.Objects;
import java.util.Set;
import java.util.function.Function;
@@ -151,11 +153,13 @@ public class LoadSingleTsFileNode extends WritePlanNode {
@Override
public List<PlanNode> getChildren() {
- return null;
+ return Collections.emptyList();
}
@Override
- public void addChild(PlanNode child) {}
+ public void addChild(PlanNode child) {
+ // Do nothing
+ }
@Override
public PlanNode clone() {
@@ -169,14 +173,18 @@ public class LoadSingleTsFileNode extends WritePlanNode {
@Override
public List<String> getOutputColumnNames() {
- return null;
+ return Collections.emptyList();
}
@Override
- protected void serializeAttributes(ByteBuffer byteBuffer) {}
+ protected void serializeAttributes(ByteBuffer byteBuffer) {
+ // Do nothing
+ }
@Override
- protected void serializeAttributes(DataOutputStream stream) throws
IOException {}
+ protected void serializeAttributes(DataOutputStream stream) throws
IOException {
+ // Do nothing
+ }
@Override
public List<WritePlanNode> splitByPartition(Analysis analysis) {
@@ -206,4 +214,25 @@ public class LoadSingleTsFileNode extends WritePlanNode {
logger.warn(String.format("Delete After Loading %s error.", tsFile), e);
}
}
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ LoadSingleTsFileNode loadSingleTsFileNode = (LoadSingleTsFileNode) o;
+ return Objects.equals(tsFile, loadSingleTsFileNode.tsFile)
+ && Objects.equals(resource, loadSingleTsFileNode.resource)
+ && Objects.equals(needDecodeTsFile,
loadSingleTsFileNode.needDecodeTsFile)
+ && Objects.equals(deleteAfterLoad,
loadSingleTsFileNode.deleteAfterLoad)
+ && Objects.equals(localRegionReplicaSet,
loadSingleTsFileNode.localRegionReplicaSet);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(tsFile, resource, needDecodeTsFile, deleteAfterLoad,
localRegionReplicaSet);
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadTsFileNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadTsFileNode.java
index 1ae27ba569c..f304678fc3c 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadTsFileNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadTsFileNode.java
@@ -28,17 +28,15 @@ import
org.apache.iotdb.db.queryengine.plan.statement.crud.LoadTsFileStatement;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
import org.apache.iotdb.tsfile.exception.NotImplementedException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
+import java.util.Objects;
public class LoadTsFileNode extends WritePlanNode {
- private static final Logger logger =
LoggerFactory.getLogger(LoadTsFileNode.class);
private final List<TsFileResource> resources;
@@ -58,11 +56,13 @@ public class LoadTsFileNode extends WritePlanNode {
@Override
public List<PlanNode> getChildren() {
- return null;
+ return Collections.emptyList();
}
@Override
- public void addChild(PlanNode child) {}
+ public void addChild(PlanNode child) {
+ // Do nothing
+ }
@Override
public PlanNode clone() {
@@ -76,14 +76,18 @@ public class LoadTsFileNode extends WritePlanNode {
@Override
public List<String> getOutputColumnNames() {
- return null;
+ return Collections.emptyList();
}
@Override
- protected void serializeAttributes(ByteBuffer byteBuffer) {}
+ protected void serializeAttributes(ByteBuffer byteBuffer) {
+ // Do nothing
+ }
@Override
- protected void serializeAttributes(DataOutputStream stream) throws
IOException {}
+ protected void serializeAttributes(DataOutputStream stream) throws
IOException {
+ // Do nothing
+ }
@Override
public List<WritePlanNode> splitByPartition(Analysis analysis) {
@@ -94,4 +98,21 @@ public class LoadTsFileNode extends WritePlanNode {
}
return res;
}
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ LoadTsFileNode loadTsFileNode = (LoadTsFileNode) o;
+ return Objects.equals(resources, loadTsFileNode.resources);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(resources);
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadTsFilePieceNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadTsFilePieceNode.java
index 30de17ca433..ab7f9367e8d 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadTsFilePieceNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadTsFilePieceNode.java
@@ -42,7 +42,9 @@ import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
+import java.util.Objects;
public class LoadTsFilePieceNode extends WritePlanNode {
private static final Logger logger =
LoggerFactory.getLogger(LoadTsFilePieceNode.class);
@@ -87,11 +89,13 @@ public class LoadTsFilePieceNode extends WritePlanNode {
@Override
public List<PlanNode> getChildren() {
- return null;
+ return Collections.emptyList();
}
@Override
- public void addChild(PlanNode child) {}
+ public void addChild(PlanNode child) {
+ // Do nothing
+ }
@Override
public PlanNode clone() {
@@ -105,7 +109,7 @@ public class LoadTsFilePieceNode extends WritePlanNode {
@Override
public List<String> getOutputColumnNames() {
- return null;
+ return Collections.emptyList();
}
@Override
@@ -161,6 +165,25 @@ public class LoadTsFilePieceNode extends WritePlanNode {
}
}
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ LoadTsFilePieceNode loadTsFilePieceNode = (LoadTsFilePieceNode) o;
+ return Objects.equals(tsFile, loadTsFilePieceNode.tsFile)
+ && Objects.equals(dataSize, loadTsFilePieceNode.dataSize)
+ && Objects.equals(tsFileDataList, loadTsFilePieceNode.tsFileDataList);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(tsFile, dataSize, tsFileDataList);
+ }
+
@Override
public String toString() {
return "LoadTsFilePieceNode{" + "tsFile=" + tsFile + ", dataSize=" +
dataSize + '}';
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileDispatcherImpl.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileDispatcherImpl.java
index 3be57bc09eb..d23a72c8d32 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileDispatcherImpl.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileDispatcherImpl.java
@@ -93,7 +93,7 @@ public class LoadTsFileDispatcherImpl implements
IFragInstanceDispatcher {
dispatchOneInstance(instance);
} catch (FragmentInstanceDispatchException e) {
return new FragInstanceDispatchResult(e.getFailureStatus());
- } catch (Throwable t) {
+ } catch (Exception t) {
logger.warn("cannot dispatch FI for load operation", t);
return new FragInstanceDispatchResult(
RpcUtils.getStatus(
@@ -139,15 +139,49 @@ public class LoadTsFileDispatcherImpl implements
IFragInstanceDispatcher {
logger.warn(loadResp.message);
throw new FragmentInstanceDispatchException(loadResp.status);
}
+ } catch (ClientManagerException | TException e) {
+ String warning = "Can't connect to node {}";
+ logger.warn(warning, endPoint, e);
+ TSStatus status = new TSStatus();
+ status.setCode(TSStatusCode.DISPATCH_ERROR.getStatusCode());
+ status.setMessage(warning + endPoint);
+ throw new FragmentInstanceDispatchException(status);
+ }
+ }
+
+ private void dispatchRemote(TLoadCommandReq loadCommandReq, TEndPoint
endPoint)
+ throws FragmentInstanceDispatchException {
+ try (SyncDataNodeInternalServiceClient client =
+ internalServiceClientManager.borrowClient(endPoint)) {
+ TLoadResp loadResp = client.sendLoadCommand(loadCommandReq);
+ if (!loadResp.isAccepted()) {
+ logger.warn(loadResp.message);
+ throw new FragmentInstanceDispatchException(loadResp.status);
+ }
} catch (ClientManagerException | TException e) {
logger.warn("can't connect to node {}", endPoint, e);
TSStatus status = new TSStatus();
status.setCode(TSStatusCode.DISPATCH_ERROR.getStatusCode());
- status.setMessage("can't connect to node {}" + endPoint);
+ status.setMessage(
+ "can't connect to node {}, please reset longer
dn_connection_timeout_ms "
+ + "in iotdb-common.properties and restart iotdb."
+ + endPoint);
throw new FragmentInstanceDispatchException(status);
}
}
+ private void dispatchLocally(TLoadCommandReq loadCommandReq)
+ throws FragmentInstanceDispatchException {
+ TSStatus resultStatus =
+ StorageEngine.getInstance()
+ .executeLoadCommand(
+
LoadTsFileScheduler.LoadCommand.values()[loadCommandReq.commandType],
+ loadCommandReq.uuid);
+ if (!RpcUtils.SUCCESS_STATUS.equals(resultStatus)) {
+ throw new FragmentInstanceDispatchException(resultStatus);
+ }
+ }
+
public void dispatchLocally(FragmentInstance instance) throws
FragmentInstanceDispatchException {
logger.info("Receive load node from uuid {}.", uuid);
@@ -206,7 +240,7 @@ public class LoadTsFileDispatcherImpl implements
IFragInstanceDispatcher {
}
} catch (FragmentInstanceDispatchException e) {
return immediateFuture(new
FragInstanceDispatchResult(e.getFailureStatus()));
- } catch (Throwable t) {
+ } catch (Exception t) {
logger.warn("cannot dispatch LoadCommand for load operation", t);
return immediateFuture(
new FragInstanceDispatchResult(
@@ -217,38 +251,8 @@ public class LoadTsFileDispatcherImpl implements
IFragInstanceDispatcher {
return immediateFuture(new FragInstanceDispatchResult(true));
}
- private void dispatchRemote(TLoadCommandReq loadCommandReq, TEndPoint
endPoint)
- throws FragmentInstanceDispatchException {
- try (SyncDataNodeInternalServiceClient client =
- internalServiceClientManager.borrowClient(endPoint)) {
- TLoadResp loadResp = client.sendLoadCommand(loadCommandReq);
- if (!loadResp.isAccepted()) {
- logger.warn(loadResp.message);
- throw new FragmentInstanceDispatchException(loadResp.status);
- }
- } catch (ClientManagerException | TException e) {
- logger.warn("can't connect to node {}", endPoint, e);
- TSStatus status = new TSStatus();
- status.setCode(TSStatusCode.DISPATCH_ERROR.getStatusCode());
- status.setMessage(
- "can't connect to node {}, please reset longer
dn_connection_timeout_ms in iotdb-common.properties and restart iotdb."
- + endPoint);
- throw new FragmentInstanceDispatchException(status);
- }
- }
-
- private void dispatchLocally(TLoadCommandReq loadCommandReq)
- throws FragmentInstanceDispatchException {
- TSStatus resultStatus =
- StorageEngine.getInstance()
- .executeLoadCommand(
-
LoadTsFileScheduler.LoadCommand.values()[loadCommandReq.commandType],
- loadCommandReq.uuid);
- if (!RpcUtils.SUCCESS_STATUS.equals(resultStatus)) {
- throw new FragmentInstanceDispatchException(resultStatus);
- }
- }
-
@Override
- public void abort() {}
+ public void abort() {
+ // Do nothing
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
index 73ec5260df7..4d8eff3110c 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
@@ -137,9 +137,8 @@ public class LoadTsFileScheduler implements IScheduler {
try {
if (node.isTsFileEmpty()) {
logger.info(
- String.format(
- "Load skip TsFile %s, because it has no data.",
- node.getTsFileResource().getTsFilePath()));
+ "Load skip TsFile {}, because it has no data.",
+ node.getTsFileResource().getTsFilePath());
} else if (!node.needDecodeTsFile(
partitionFetcher::queryDataPartition)) { // do not decode, load
locally
@@ -162,15 +161,17 @@ public class LoadTsFileScheduler implements IScheduler {
}
if (isLoadSingleTsFileSuccess) {
logger.info(
- String.format(
- "Load TsFile %s Successfully, load process [%s/%s]",
- node.getTsFileResource().getTsFilePath(), i + 1,
tsFileNodeListSize));
+ "Load TsFile {} Successfully, load process [{}/{}]",
+ node.getTsFileResource().getTsFilePath(),
+ i + 1,
+ tsFileNodeListSize);
} else {
isLoadSuccess = false;
logger.warn(
- String.format(
- "Can not Load TsFile %s, load process [%s/%s]",
- node.getTsFileResource().getTsFilePath(), i + 1,
tsFileNodeListSize));
+ "Can not Load TsFile {}, load process [{}/{}]",
+ node.getTsFileResource().getTsFilePath(),
+ i + 1,
+ tsFileNodeListSize);
}
} catch (Exception e) {
isLoadSuccess = false;
@@ -236,12 +237,12 @@ public class LoadTsFileScheduler implements IScheduler {
if (!result.isSuccessful()) {
// TODO: retry.
logger.warn(
- String.format(
- "Dispatch one piece to ReplicaSet %s error. Result status code
%s. Result status message %s. Dispatch piece node error:%n%s",
- replicaSet,
-
TSStatusCode.representOf(result.getFailureStatus().getCode()).name(),
- result.getFailureStatus().getMessage(),
- pieceNode));
+ "Dispatch one piece to ReplicaSet {} error. Result status code {}.
"
+ + "Result status message {}. Dispatch piece node error:%n{}",
+ replicaSet,
+
TSStatusCode.representOf(result.getFailureStatus().getCode()).name(),
+ result.getFailureStatus().getMessage(),
+ pieceNode);
if (result.getFailureStatus().getSubStatus() != null) {
for (TSStatus status : result.getFailureStatus().getSubStatus()) {
logger.warn(
@@ -287,13 +288,13 @@ public class LoadTsFileScheduler implements IScheduler {
if (!result.isSuccessful()) {
// TODO: retry.
logger.warn(
- String.format(
- "Dispatch load command %s of TsFile %s error to replicaSets %s
error. Result status code %s. Result status message %s.",
- loadCommandReq,
- tsFile,
- allReplicaSets,
-
TSStatusCode.representOf(result.getFailureStatus().getCode()).name(),
- result.getFailureStatus().getMessage()));
+ "Dispatch load command {} of TsFile {} error to replicaSets {}
error. "
+ + "Result status code {}. Result status message {}.",
+ loadCommandReq,
+ tsFile,
+ allReplicaSets,
+
TSStatusCode.representOf(result.getFailureStatus().getCode()).name(),
+ result.getFailureStatus().getMessage());
TSStatus status = result.getFailureStatus();
status.setMessage(
String.format("Load %s error in 2nd phase. Because ", tsFile) +
status.getMessage());
@@ -327,7 +328,8 @@ public class LoadTsFileScheduler implements IScheduler {
} catch (FragmentInstanceDispatchException e) {
logger.warn(
String.format(
- "Dispatch tsFile %s error to local error. Result status code %s.
Result status message %s.",
+ "Dispatch tsFile %s error to local error. Result status code %s.
"
+ + "Result status message %s.",
node.getTsFileResource().getTsFile(),
TSStatusCode.representOf(e.getFailureStatus().getCode()).name(),
e.getFailureStatus().getMessage()));
@@ -338,7 +340,9 @@ public class LoadTsFileScheduler implements IScheduler {
}
@Override
- public void stop(Throwable t) {}
+ public void stop(Throwable t) {
+ // Do nothing
+ }
@Override
public Duration getTotalCpuTime() {
@@ -351,10 +355,14 @@ public class LoadTsFileScheduler implements IScheduler {
}
@Override
- public void abortFragmentInstance(FragmentInstanceId instanceId, Throwable
failureCause) {}
+ public void abortFragmentInstance(FragmentInstanceId instanceId, Throwable
failureCause) {
+ // Do nothing
+ }
@Override
- public void cancelFragment(PlanFragmentId planFragmentId) {}
+ public void cancelFragment(PlanFragmentId planFragmentId) {
+ // Do nothing
+ }
public enum LoadCommand {
EXECUTE,