This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 32e30b3217a Tsfile load: fix bugs & smells reported by SonarCloud 
(#10332)
32e30b3217a is described below

commit 32e30b3217aed468a200433b9a79a78d6f05f8e1
Author: Caideyipi <[email protected]>
AuthorDate: Thu Jun 29 11:04:43 2023 +0800

    Tsfile load: fix bugs & smells reported by SonarCloud (#10332)
---
 .../execution/load/LoadTsFileManager.java          |  8 ++-
 .../queryengine/execution/load/TsFileSplitter.java |  3 +-
 .../plan/node/load/LoadSingleTsFileNode.java       | 39 +++++++++--
 .../planner/plan/node/load/LoadTsFileNode.java     | 39 ++++++++---
 .../plan/node/load/LoadTsFilePieceNode.java        | 29 ++++++++-
 .../scheduler/load/LoadTsFileDispatcherImpl.java   | 76 ++++++++++++----------
 .../plan/scheduler/load/LoadTsFileScheduler.java   | 60 +++++++++--------
 7 files changed, 171 insertions(+), 83 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/LoadTsFileManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/LoadTsFileManager.java
index f26529df255..d2199f77cc9 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/LoadTsFileManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/LoadTsFileManager.java
@@ -307,8 +307,12 @@ public class LoadTsFileManager {
 
     @Override
     public boolean equals(Object o) {
-      if (this == o) return true;
-      if (o == null || getClass() != o.getClass()) return false;
+      if (this == o) {
+        return true;
+      }
+      if (o == null || getClass() != o.getClass()) {
+        return false;
+      }
       DataPartitionInfo that = (DataPartitionInfo) o;
       return Objects.equals(dataRegion, that.dataRegion)
           && timePartitionSlot.getStartTime() == 
that.timePartitionSlot.getStartTime();
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/TsFileSplitter.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/TsFileSplitter.java
index 34b19ad2c6c..721360ebbee 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/TsFileSplitter.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/TsFileSplitter.java
@@ -252,7 +252,6 @@ public class TsFileSplitter {
                   reader.readPageHeader(
                       header.getDataType(),
                       (header.getChunkType() & 0x3F) == 
MetaMarker.CHUNK_HEADER);
-              long pageDataSize = pageHeader.getSerializedPageSize();
               List<AlignedChunkData> alignedChunkDataList = 
pageIndex2ChunkData.get(pageIndex);
               for (AlignedChunkData alignedChunkData : alignedChunkDataList) {
                 if (!allChunkData.contains(alignedChunkData)) {
@@ -272,7 +271,7 @@ public class TsFileSplitter {
                   alignedChunkData.writeDecodeValuePage(times, values, 
header.getDataType());
                 }
               }
-
+              long pageDataSize = pageHeader.getSerializedPageSize();
               pageIndex += 1;
               dataSize -= pageDataSize;
             }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadSingleTsFileNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadSingleTsFileNode.java
index 199d05690b2..d536a357412 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadSingleTsFileNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadSingleTsFileNode.java
@@ -43,8 +43,10 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.file.Files;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Objects;
 import java.util.Set;
 import java.util.function.Function;
 
@@ -151,11 +153,13 @@ public class LoadSingleTsFileNode extends WritePlanNode {
 
   @Override
   public List<PlanNode> getChildren() {
-    return null;
+    return Collections.emptyList();
   }
 
   @Override
-  public void addChild(PlanNode child) {}
+  public void addChild(PlanNode child) {
+    // Do nothing
+  }
 
   @Override
   public PlanNode clone() {
@@ -169,14 +173,18 @@ public class LoadSingleTsFileNode extends WritePlanNode {
 
   @Override
   public List<String> getOutputColumnNames() {
-    return null;
+    return Collections.emptyList();
   }
 
   @Override
-  protected void serializeAttributes(ByteBuffer byteBuffer) {}
+  protected void serializeAttributes(ByteBuffer byteBuffer) {
+    // Do nothing
+  }
 
   @Override
-  protected void serializeAttributes(DataOutputStream stream) throws 
IOException {}
+  protected void serializeAttributes(DataOutputStream stream) throws 
IOException {
+    // Do nothing
+  }
 
   @Override
   public List<WritePlanNode> splitByPartition(Analysis analysis) {
@@ -206,4 +214,25 @@ public class LoadSingleTsFileNode extends WritePlanNode {
       logger.warn(String.format("Delete After Loading %s error.", tsFile), e);
     }
   }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    LoadSingleTsFileNode loadSingleTsFileNode = (LoadSingleTsFileNode) o;
+    return Objects.equals(tsFile, loadSingleTsFileNode.tsFile)
+        && Objects.equals(resource, loadSingleTsFileNode.resource)
+        && Objects.equals(needDecodeTsFile, 
loadSingleTsFileNode.needDecodeTsFile)
+        && Objects.equals(deleteAfterLoad, 
loadSingleTsFileNode.deleteAfterLoad)
+        && Objects.equals(localRegionReplicaSet, 
loadSingleTsFileNode.localRegionReplicaSet);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(tsFile, resource, needDecodeTsFile, deleteAfterLoad, 
localRegionReplicaSet);
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadTsFileNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadTsFileNode.java
index 1ae27ba569c..f304678fc3c 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadTsFileNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadTsFileNode.java
@@ -28,17 +28,15 @@ import 
org.apache.iotdb.db.queryengine.plan.statement.crud.LoadTsFileStatement;
 import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
 import org.apache.iotdb.tsfile.exception.NotImplementedException;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
+import java.util.Objects;
 
 public class LoadTsFileNode extends WritePlanNode {
-  private static final Logger logger = 
LoggerFactory.getLogger(LoadTsFileNode.class);
 
   private final List<TsFileResource> resources;
 
@@ -58,11 +56,13 @@ public class LoadTsFileNode extends WritePlanNode {
 
   @Override
   public List<PlanNode> getChildren() {
-    return null;
+    return Collections.emptyList();
   }
 
   @Override
-  public void addChild(PlanNode child) {}
+  public void addChild(PlanNode child) {
+    // Do nothing
+  }
 
   @Override
   public PlanNode clone() {
@@ -76,14 +76,18 @@ public class LoadTsFileNode extends WritePlanNode {
 
   @Override
   public List<String> getOutputColumnNames() {
-    return null;
+    return Collections.emptyList();
   }
 
   @Override
-  protected void serializeAttributes(ByteBuffer byteBuffer) {}
+  protected void serializeAttributes(ByteBuffer byteBuffer) {
+    // Do nothing
+  }
 
   @Override
-  protected void serializeAttributes(DataOutputStream stream) throws 
IOException {}
+  protected void serializeAttributes(DataOutputStream stream) throws 
IOException {
+    // Do nothing
+  }
 
   @Override
   public List<WritePlanNode> splitByPartition(Analysis analysis) {
@@ -94,4 +98,21 @@ public class LoadTsFileNode extends WritePlanNode {
     }
     return res;
   }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    LoadTsFileNode loadTsFileNode = (LoadTsFileNode) o;
+    return Objects.equals(resources, loadTsFileNode.resources);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(resources);
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadTsFilePieceNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadTsFilePieceNode.java
index 30de17ca433..ab7f9367e8d 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadTsFilePieceNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadTsFilePieceNode.java
@@ -42,7 +42,9 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
+import java.util.Objects;
 
 public class LoadTsFilePieceNode extends WritePlanNode {
   private static final Logger logger = 
LoggerFactory.getLogger(LoadTsFilePieceNode.class);
@@ -87,11 +89,13 @@ public class LoadTsFilePieceNode extends WritePlanNode {
 
   @Override
   public List<PlanNode> getChildren() {
-    return null;
+    return Collections.emptyList();
   }
 
   @Override
-  public void addChild(PlanNode child) {}
+  public void addChild(PlanNode child) {
+    // Do nothing
+  }
 
   @Override
   public PlanNode clone() {
@@ -105,7 +109,7 @@ public class LoadTsFilePieceNode extends WritePlanNode {
 
   @Override
   public List<String> getOutputColumnNames() {
-    return null;
+    return Collections.emptyList();
   }
 
   @Override
@@ -161,6 +165,25 @@ public class LoadTsFilePieceNode extends WritePlanNode {
     }
   }
 
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    LoadTsFilePieceNode loadTsFilePieceNode = (LoadTsFilePieceNode) o;
+    return Objects.equals(tsFile, loadTsFilePieceNode.tsFile)
+        && Objects.equals(dataSize, loadTsFilePieceNode.dataSize)
+        && Objects.equals(tsFileDataList, loadTsFilePieceNode.tsFileDataList);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(tsFile, dataSize, tsFileDataList);
+  }
+
   @Override
   public String toString() {
     return "LoadTsFilePieceNode{" + "tsFile=" + tsFile + ", dataSize=" + 
dataSize + '}';
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileDispatcherImpl.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileDispatcherImpl.java
index 3be57bc09eb..d23a72c8d32 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileDispatcherImpl.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileDispatcherImpl.java
@@ -93,7 +93,7 @@ public class LoadTsFileDispatcherImpl implements 
IFragInstanceDispatcher {
               dispatchOneInstance(instance);
             } catch (FragmentInstanceDispatchException e) {
               return new FragInstanceDispatchResult(e.getFailureStatus());
-            } catch (Throwable t) {
+            } catch (Exception t) {
               logger.warn("cannot dispatch FI for load operation", t);
               return new FragInstanceDispatchResult(
                   RpcUtils.getStatus(
@@ -139,15 +139,49 @@ public class LoadTsFileDispatcherImpl implements 
IFragInstanceDispatcher {
         logger.warn(loadResp.message);
         throw new FragmentInstanceDispatchException(loadResp.status);
       }
+    } catch (ClientManagerException | TException e) {
+      String warning = "Can't connect to node {}";
+      logger.warn(warning, endPoint, e);
+      TSStatus status = new TSStatus();
+      status.setCode(TSStatusCode.DISPATCH_ERROR.getStatusCode());
+      status.setMessage(warning + endPoint);
+      throw new FragmentInstanceDispatchException(status);
+    }
+  }
+
+  private void dispatchRemote(TLoadCommandReq loadCommandReq, TEndPoint 
endPoint)
+      throws FragmentInstanceDispatchException {
+    try (SyncDataNodeInternalServiceClient client =
+        internalServiceClientManager.borrowClient(endPoint)) {
+      TLoadResp loadResp = client.sendLoadCommand(loadCommandReq);
+      if (!loadResp.isAccepted()) {
+        logger.warn(loadResp.message);
+        throw new FragmentInstanceDispatchException(loadResp.status);
+      }
     } catch (ClientManagerException | TException e) {
       logger.warn("can't connect to node {}", endPoint, e);
       TSStatus status = new TSStatus();
       status.setCode(TSStatusCode.DISPATCH_ERROR.getStatusCode());
-      status.setMessage("can't connect to node {}" + endPoint);
+      status.setMessage(
+          "can't connect to node {}, please reset longer 
dn_connection_timeout_ms "
+              + "in iotdb-common.properties and restart iotdb."
+              + endPoint);
       throw new FragmentInstanceDispatchException(status);
     }
   }
 
+  private void dispatchLocally(TLoadCommandReq loadCommandReq)
+      throws FragmentInstanceDispatchException {
+    TSStatus resultStatus =
+        StorageEngine.getInstance()
+            .executeLoadCommand(
+                
LoadTsFileScheduler.LoadCommand.values()[loadCommandReq.commandType],
+                loadCommandReq.uuid);
+    if (!RpcUtils.SUCCESS_STATUS.equals(resultStatus)) {
+      throw new FragmentInstanceDispatchException(resultStatus);
+    }
+  }
+
   public void dispatchLocally(FragmentInstance instance) throws 
FragmentInstanceDispatchException {
     logger.info("Receive load node from uuid {}.", uuid);
 
@@ -206,7 +240,7 @@ public class LoadTsFileDispatcherImpl implements 
IFragInstanceDispatcher {
         }
       } catch (FragmentInstanceDispatchException e) {
         return immediateFuture(new 
FragInstanceDispatchResult(e.getFailureStatus()));
-      } catch (Throwable t) {
+      } catch (Exception t) {
         logger.warn("cannot dispatch LoadCommand for load operation", t);
         return immediateFuture(
             new FragInstanceDispatchResult(
@@ -217,38 +251,8 @@ public class LoadTsFileDispatcherImpl implements 
IFragInstanceDispatcher {
     return immediateFuture(new FragInstanceDispatchResult(true));
   }
 
-  private void dispatchRemote(TLoadCommandReq loadCommandReq, TEndPoint 
endPoint)
-      throws FragmentInstanceDispatchException {
-    try (SyncDataNodeInternalServiceClient client =
-        internalServiceClientManager.borrowClient(endPoint)) {
-      TLoadResp loadResp = client.sendLoadCommand(loadCommandReq);
-      if (!loadResp.isAccepted()) {
-        logger.warn(loadResp.message);
-        throw new FragmentInstanceDispatchException(loadResp.status);
-      }
-    } catch (ClientManagerException | TException e) {
-      logger.warn("can't connect to node {}", endPoint, e);
-      TSStatus status = new TSStatus();
-      status.setCode(TSStatusCode.DISPATCH_ERROR.getStatusCode());
-      status.setMessage(
-          "can't connect to node {}, please reset longer 
dn_connection_timeout_ms in iotdb-common.properties and restart iotdb."
-              + endPoint);
-      throw new FragmentInstanceDispatchException(status);
-    }
-  }
-
-  private void dispatchLocally(TLoadCommandReq loadCommandReq)
-      throws FragmentInstanceDispatchException {
-    TSStatus resultStatus =
-        StorageEngine.getInstance()
-            .executeLoadCommand(
-                
LoadTsFileScheduler.LoadCommand.values()[loadCommandReq.commandType],
-                loadCommandReq.uuid);
-    if (!RpcUtils.SUCCESS_STATUS.equals(resultStatus)) {
-      throw new FragmentInstanceDispatchException(resultStatus);
-    }
-  }
-
   @Override
-  public void abort() {}
+  public void abort() {
+    // Do nothing
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
index 73ec5260df7..4d8eff3110c 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
@@ -137,9 +137,8 @@ public class LoadTsFileScheduler implements IScheduler {
       try {
         if (node.isTsFileEmpty()) {
           logger.info(
-              String.format(
-                  "Load skip TsFile %s, because it has no data.",
-                  node.getTsFileResource().getTsFilePath()));
+              "Load skip TsFile {}, because it has no data.",
+              node.getTsFileResource().getTsFilePath());
 
         } else if (!node.needDecodeTsFile(
             partitionFetcher::queryDataPartition)) { // do not decode, load 
locally
@@ -162,15 +161,17 @@ public class LoadTsFileScheduler implements IScheduler {
         }
         if (isLoadSingleTsFileSuccess) {
           logger.info(
-              String.format(
-                  "Load TsFile %s Successfully, load process [%s/%s]",
-                  node.getTsFileResource().getTsFilePath(), i + 1, 
tsFileNodeListSize));
+              "Load TsFile {} Successfully, load process [{}/{}]",
+              node.getTsFileResource().getTsFilePath(),
+              i + 1,
+              tsFileNodeListSize);
         } else {
           isLoadSuccess = false;
           logger.warn(
-              String.format(
-                  "Can not Load TsFile %s, load process [%s/%s]",
-                  node.getTsFileResource().getTsFilePath(), i + 1, 
tsFileNodeListSize));
+              "Can not Load TsFile {}, load process [{}/{}]",
+              node.getTsFileResource().getTsFilePath(),
+              i + 1,
+              tsFileNodeListSize);
         }
       } catch (Exception e) {
         isLoadSuccess = false;
@@ -236,12 +237,12 @@ public class LoadTsFileScheduler implements IScheduler {
       if (!result.isSuccessful()) {
         // TODO: retry.
         logger.warn(
-            String.format(
-                "Dispatch one piece to ReplicaSet %s error. Result status code 
%s. Result status message %s. Dispatch piece node error:%n%s",
-                replicaSet,
-                
TSStatusCode.representOf(result.getFailureStatus().getCode()).name(),
-                result.getFailureStatus().getMessage(),
-                pieceNode));
+            "Dispatch one piece to ReplicaSet {} error. Result status code {}. 
"
+                + "Result status message {}. Dispatch piece node error:%n{}",
+            replicaSet,
+            
TSStatusCode.representOf(result.getFailureStatus().getCode()).name(),
+            result.getFailureStatus().getMessage(),
+            pieceNode);
         if (result.getFailureStatus().getSubStatus() != null) {
           for (TSStatus status : result.getFailureStatus().getSubStatus()) {
             logger.warn(
@@ -287,13 +288,13 @@ public class LoadTsFileScheduler implements IScheduler {
       if (!result.isSuccessful()) {
         // TODO: retry.
         logger.warn(
-            String.format(
-                "Dispatch load command %s of TsFile %s error to replicaSets %s 
error. Result status code %s. Result status message %s.",
-                loadCommandReq,
-                tsFile,
-                allReplicaSets,
-                
TSStatusCode.representOf(result.getFailureStatus().getCode()).name(),
-                result.getFailureStatus().getMessage()));
+            "Dispatch load command {} of TsFile {} error to replicaSets {} 
error. "
+                + "Result status code {}. Result status message {}.",
+            loadCommandReq,
+            tsFile,
+            allReplicaSets,
+            
TSStatusCode.representOf(result.getFailureStatus().getCode()).name(),
+            result.getFailureStatus().getMessage());
         TSStatus status = result.getFailureStatus();
         status.setMessage(
             String.format("Load %s error in 2nd phase. Because ", tsFile) + 
status.getMessage());
@@ -327,7 +328,8 @@ public class LoadTsFileScheduler implements IScheduler {
     } catch (FragmentInstanceDispatchException e) {
       logger.warn(
           String.format(
-              "Dispatch tsFile %s error to local error. Result status code %s. 
Result status message %s.",
+              "Dispatch tsFile %s error to local error. Result status code %s. 
"
+                  + "Result status message %s.",
               node.getTsFileResource().getTsFile(),
               TSStatusCode.representOf(e.getFailureStatus().getCode()).name(),
               e.getFailureStatus().getMessage()));
@@ -338,7 +340,9 @@ public class LoadTsFileScheduler implements IScheduler {
   }
 
   @Override
-  public void stop(Throwable t) {}
+  public void stop(Throwable t) {
+    // Do nothing
+  }
 
   @Override
   public Duration getTotalCpuTime() {
@@ -351,10 +355,14 @@ public class LoadTsFileScheduler implements IScheduler {
   }
 
   @Override
-  public void abortFragmentInstance(FragmentInstanceId instanceId, Throwable 
failureCause) {}
+  public void abortFragmentInstance(FragmentInstanceId instanceId, Throwable 
failureCause) {
+    // Do nothing
+  }
 
   @Override
-  public void cancelFragment(PlanFragmentId planFragmentId) {}
+  public void cancelFragment(PlanFragmentId planFragmentId) {
+    // Do nothing
+  }
 
   public enum LoadCommand {
     EXECUTE,

Reply via email to