This is an automated email from the ASF dual-hosted git repository. haonan pushed a commit to branch fix_sonar_issue_0704 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit a94df322b9f794d1808fe64bde95b5bf7f1678d1 Author: HTHou <[email protected]> AuthorDate: Thu Jul 4 15:20:53 2024 +0800 Fix sonar issues --- .../java/org/apache/iotdb/tool/ImportTsFile.java | 6 ++- .../iotdb/rpc/subscription/config/TopicConfig.java | 8 +--- .../SubscriptionIdentifierSemanticException.java | 5 +++ .../consumer/SubscriptionConsumer.java | 44 ++++++++++------------ .../conf/ConfigNodeSystemPropertiesHandler.java | 7 +--- .../iotdb/confignode/manager/ConfigManager.java | 5 ++- .../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 9 +++++ .../batch/PipeTabletEventTsFileBatch.java | 2 +- .../pipeconsensus/PipeConsensusReceiver.java | 17 +++++---- .../broker/SubscriptionPrefetchingQueue.java | 40 +++++++++----------- .../connector/limiter/PipeEndPointRateLimiter.java | 2 +- .../task/subtask/PipeAbstractConnectorSubtask.java | 2 +- 12 files changed, 76 insertions(+), 71 deletions(-) diff --git a/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/ImportTsFile.java b/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/ImportTsFile.java index 15cdd907f8b..634f77bdbe5 100644 --- a/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/ImportTsFile.java +++ b/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/ImportTsFile.java @@ -406,7 +406,11 @@ public class ImportTsFile extends AbstractTsFileTool { if (Objects.nonNull(e.getMessage()) && e.getMessage().contains("memory")) { ioTPrinter.println( "Rejecting file [ " + filePath + " ] due to memory constraints, will retry later."); - tsfileQueue.put(filePath); + try { + tsfileQueue.put(filePath); + } catch (InterruptedException exception) { + Thread.currentThread().interrupt(); + } continue; } diff --git a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/config/TopicConfig.java b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/config/TopicConfig.java index 2509213446e..617722b70d6 100644 --- a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/config/TopicConfig.java +++ b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/config/TopicConfig.java @@ -26,6 +26,7 @@ import org.apache.tsfile.utils.ReadWriteIOUtils; import java.io.DataOutputStream; import java.io.IOException; import java.nio.ByteBuffer; +import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -58,12 +59,7 @@ public class TopicConfig extends PipeParameters { private static final Set<String> LOOSE_RANGE_KEY_SET = Collections.unmodifiableSet( - new HashSet<String>() { - { - add("history.loose-range"); - add("realtime.loose-range"); - } - }); + new HashSet<>(Arrays.asList("history.loose-range", "realtime.loose-range"))); /////////////////////////////// de/ser /////////////////////////////// diff --git a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/exception/SubscriptionIdentifierSemanticException.java b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/exception/SubscriptionIdentifierSemanticException.java index 6eb9ba6d734..dd10d428023 100644 --- a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/exception/SubscriptionIdentifierSemanticException.java +++ b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/exception/SubscriptionIdentifierSemanticException.java @@ -39,4 +39,9 @@ public class SubscriptionIdentifierSemanticException extends SubscriptionExcepti && Objects.equals( getTimeStamp(), ((SubscriptionIdentifierSemanticException) obj).getTimeStamp()); } + + @Override + public int hashCode() { + return super.hashCode(); + } } diff --git a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionConsumer.java b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionConsumer.java index d54ed406774..e3e5d6c5950 100644 --- a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionConsumer.java +++ b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionConsumer.java @@ -101,7 +101,7 @@ abstract class SubscriptionConsumer implements AutoCloseable { private final String fileSaveDir; private final boolean fileSaveFsync; - protected volatile Map<String, TopicConfig> subscribedTopics = new HashMap<>(); + protected Map<String, TopicConfig> subscribedTopics = new HashMap<>(); public boolean allSnapshotTopicMessagesHaveBeenConsumed() { return subscribedTopics.values().stream() @@ -1038,31 +1038,27 @@ abstract class SubscriptionConsumer implements AutoCloseable { /////////////////////////////// stringify /////////////////////////////// protected Map<String, String> coreReportMessage() { - return new HashMap<String, String>() { - { - put("consumerId", consumerId); - put("consumerGroupId", consumerGroupId); - put("isClosed", isClosed.toString()); - put("fileSaveDir", fileSaveDir); - put("subscribedTopicNames", subscribedTopics.keySet().toString()); - } - }; + Map<String, String> reportMessage = new HashMap<>(); + reportMessage.put("consumerId", consumerId); + reportMessage.put("consumerGroupId", consumerGroupId); + reportMessage.put("isClosed", isClosed.toString()); + reportMessage.put("fileSaveDir", fileSaveDir); + reportMessage.put("subscribedTopicNames", subscribedTopics.keySet().toString()); + return reportMessage; } protected Map<String, String> allReportMessage() { - return new HashMap<String, String>() { - { - put("consumerId", consumerId); - put("consumerGroupId", consumerGroupId); - put("heartbeatIntervalMs", String.valueOf(heartbeatIntervalMs)); - put("endpointsSyncIntervalMs", String.valueOf(endpointsSyncIntervalMs)); - put("providers", providers.toString()); - put("isClosed", isClosed.toString()); - put("isReleased", isReleased.toString()); - put("fileSaveDir", fileSaveDir); - put("fileSaveFsync", String.valueOf(fileSaveFsync)); - put("subscribedTopics", subscribedTopics.toString()); - } - }; + Map<String, String> reportMessage = new HashMap<>(); + reportMessage.put("consumerId", consumerId); + reportMessage.put("consumerGroupId", consumerGroupId); + reportMessage.put("heartbeatIntervalMs", String.valueOf(heartbeatIntervalMs)); + reportMessage.put("endpointsSyncIntervalMs", String.valueOf(endpointsSyncIntervalMs)); + reportMessage.put("providers", providers.toString()); + reportMessage.put("isClosed", isClosed.toString()); + reportMessage.put("isReleased", isReleased.toString()); + reportMessage.put("fileSaveDir", fileSaveDir); + reportMessage.put("fileSaveFsync", String.valueOf(fileSaveFsync)); + reportMessage.put("subscribedTopics", subscribedTopics.toString()); + return reportMessage; } } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeSystemPropertiesHandler.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeSystemPropertiesHandler.java index 2fd9969fd71..0b05d740780 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeSystemPropertiesHandler.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeSystemPropertiesHandler.java @@ -21,16 +21,11 @@ package org.apache.iotdb.confignode.conf; import org.apache.iotdb.commons.file.SystemPropertiesHandler; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import java.io.File; public class ConfigNodeSystemPropertiesHandler extends SystemPropertiesHandler { - private static final Logger LOGGER = - LoggerFactory.getLogger(ConfigNodeSystemPropertiesHandler.class); - private static ConfigNodeSystemPropertiesHandler INSTANCE; + private static volatile ConfigNodeSystemPropertiesHandler INSTANCE; private ConfigNodeSystemPropertiesHandler(String filePath) { super(filePath); diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java index 0f8271d27de..cce0000e255 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java @@ -1502,6 +1502,9 @@ public class ConfigManager implements IManager { properties.putAll(req.getConfigs()); try { ConfigurationFileUtils.updateConfigurationFile(file, properties); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + return RpcUtils.getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR, e.getMessage()); } catch (Exception e) { return RpcUtils.getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR, e.getMessage()); } @@ -1900,7 +1903,7 @@ public class ConfigManager implements IManager { PathPatternTree rawPatternTree = PathPatternTree.deserialize(ByteBuffer.wrap(req.getPathPatternTree())); boolean isGeneratedByPipe = req.isSetIsGeneratedByPipe() && req.isIsGeneratedByPipe(); - /** + /* * If delete pattern is prefix path (such as root.db.**), it may be optimized to delete * database plus create database. We need to determine two conditions: whether the pattern * ends in **, and that the device is a full path and is matched in the ConfigMTree. diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java index c38c17535fa..bdbd11385da 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java @@ -1334,6 +1334,15 @@ public class IoTDBConfig { void reloadDataDirs(String[][] tierDataDirs) throws LoadConfigurationException { // format data directories formulateDataDirs(tierDataDirs); + // make sure the tiers number not reduced + if (this.tierDataDirs.length > tierDataDirs.length) { + String msg = + String.format( + "The tiers number is reduced from %d from %d please add it back.", + this.tierDataDirs.length, tierDataDirs.length); + logger.error(msg); + throw new LoadConfigurationException(msg); + } // make sure old data directories not removed for (int i = 0; i < this.tierDataDirs.length; ++i) { List<String> newDirs = Arrays.asList(tierDataDirs[i]); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTabletEventTsFileBatch.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTabletEventTsFileBatch.java index e02f2d7bbb0..03ab0ac6339 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTabletEventTsFileBatch.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTabletEventTsFileBatch.java @@ -77,7 +77,7 @@ public class PipeTabletEventTsFileBatch extends PipeTabletEventBatch { private final List<Tablet> tabletList = new ArrayList<>(); private final List<Boolean> isTabletAlignedList = new ArrayList<>(); - private volatile TsFileWriter fileWriter; + private TsFileWriter fileWriter; public PipeTabletEventTsFileBatch(final int maxDelayInMs, final long requestMaxBatchSizeInBytes) { super(maxDelayInMs); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiver.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiver.java index 0f5af27e0a7..cb46b380ab5 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiver.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiver.java @@ -79,6 +79,7 @@ import java.util.Objects; import java.util.Optional; import java.util.TreeSet; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; @@ -1197,8 +1198,8 @@ public class PipeConsensusReceiver { private final PipeConsensusTsFileWriterPool tsFileWriterPool; private long onSyncedCommitIndex = 0; private int connectorRebootTimes = 0; - private volatile int WALEventCount = 0; - private volatile int tsFileEventCount = 0; + private final AtomicInteger WALEventCount = new AtomicInteger(0); + private final AtomicInteger tsFileEventCount = new AtomicInteger(0); public RequestExecutor( PipeConsensusReceiverMetrics metric, PipeConsensusTsFileWriterPool tsFileWriterPool) { @@ -1221,10 +1222,10 @@ public class PipeConsensusReceiver { onSyncedCommitIndex = nextSyncedCommitIndex; // update metric, notice that curMeta is never null. if (isTransferTsFileSeal) { - tsFileEventCount--; + tsFileEventCount.getAndDecrement(); metric.recordReceiveTsFileTimer(System.nanoTime() - curMeta.getStartApplyNanos()); } else { - WALEventCount--; + WALEventCount.getAndDecrement(); metric.recordReceiveWALTimer(System.nanoTime() - curMeta.getStartApplyNanos()); } } @@ -1273,10 +1274,10 @@ public class PipeConsensusReceiver { // update metric if (isTransferTsFilePiece && !reqExecutionOrderBuffer.contains(requestMeta)) { // only update tsFileEventCount when tsFileEvent is first enqueue. - tsFileEventCount++; + tsFileEventCount.getAndIncrement(); } if (!isTransferTsFileSeal && !isTransferTsFilePiece) { - WALEventCount++; + WALEventCount.getAndIncrement(); } reqExecutionOrderBuffer.add(requestMeta); @@ -1456,11 +1457,11 @@ public class PipeConsensusReceiver { } public int getWALEventCount() { - return this.requestExecutor.WALEventCount; + return this.requestExecutor.WALEventCount.get(); } public int getTsFileEventCount() { - return this.requestExecutor.tsFileEventCount; + return this.requestExecutor.tsFileEventCount.get(); } public String getConsensusGroupIdStr() { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java index df06665f162..42604c727ad 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java @@ -339,30 +339,26 @@ public abstract class SubscriptionPrefetchingQueue { /////////////////////////////// stringify /////////////////////////////// protected Map<String, String> coreReportMessage() { - return new HashMap<String, String>() { - { - put("brokerId", brokerId); - put("topicName", topicName); - put("size of uncommittedEvents", String.valueOf(uncommittedEvents.size())); - put("subscriptionCommitIdGenerator", subscriptionCommitIdGenerator.toString()); - put("isCompleted", String.valueOf(isCompleted)); - put("isClosed", String.valueOf(isClosed)); - } - }; + Map<String, String> reportMessage = new HashMap<>(); + reportMessage.put("brokerId", brokerId); + reportMessage.put("topicName", topicName); + reportMessage.put("size of uncommittedEvents", String.valueOf(uncommittedEvents.size())); + reportMessage.put("subscriptionCommitIdGenerator", subscriptionCommitIdGenerator.toString()); + reportMessage.put("isCompleted", String.valueOf(isCompleted)); + reportMessage.put("isClosed", String.valueOf(isClosed)); + return reportMessage; } protected Map<String, String> allReportMessage() { - return new HashMap<String, String>() { - { - put("brokerId", brokerId); - put("topicName", topicName); - put("size of inputPendingQueue", String.valueOf(inputPendingQueue.size())); - put("size of prefetchingQueue", String.valueOf(prefetchingQueue.size())); - put("uncommittedEvents", uncommittedEvents.toString()); - put("subscriptionCommitIdGenerator", subscriptionCommitIdGenerator.toString()); - put("isCompleted", String.valueOf(isCompleted)); - put("isClosed", String.valueOf(isClosed)); - } - }; + Map<String, String> reportMessage = new HashMap<>(); + reportMessage.put("brokerId", brokerId); + reportMessage.put("topicName", topicName); + reportMessage.put("size of inputPendingQueue", String.valueOf(inputPendingQueue.size())); + reportMessage.put("size of prefetchingQueue", String.valueOf(prefetchingQueue.size())); + reportMessage.put("uncommittedEvents", uncommittedEvents.toString()); + reportMessage.put("subscriptionCommitIdGenerator", subscriptionCommitIdGenerator.toString()); + reportMessage.put("isCompleted", String.valueOf(isCompleted)); + reportMessage.put("isClosed", String.valueOf(isClosed)); + return reportMessage; } } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/limiter/PipeEndPointRateLimiter.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/limiter/PipeEndPointRateLimiter.java index 46f16a8eb11..008c4d60fbd 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/limiter/PipeEndPointRateLimiter.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/limiter/PipeEndPointRateLimiter.java @@ -33,7 +33,7 @@ import java.util.concurrent.TimeUnit; public class PipeEndPointRateLimiter { // The task agent is used to check if the pipe is still alive - private static volatile PipeTaskAgent taskAgent; + private static PipeTaskAgent taskAgent; private final String pipeName; private final long creationTime; diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/subtask/PipeAbstractConnectorSubtask.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/subtask/PipeAbstractConnectorSubtask.java index e6a2399f7e8..5fb1eadd8db 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/subtask/PipeAbstractConnectorSubtask.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/subtask/PipeAbstractConnectorSubtask.java @@ -50,7 +50,7 @@ public abstract class PipeAbstractConnectorSubtask extends PipeReportableSubtask protected volatile boolean isSubmitted = false; // For cleaning up the last event when the pipe is dropped - protected volatile Event lastExceptionEvent; + protected Event lastExceptionEvent; protected PipeAbstractConnectorSubtask( final String taskID, final long creationTime, final PipeConnector outputPipeConnector) {
