This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 6bb3d93be13 Pipe IT: Fix WALFlushListener.testConcurrentGetValue() & 
Fix smells reported by sonar  (#11525)
6bb3d93be13 is described below

commit 6bb3d93be13f11fdc0cdcafac21045a03c806b1f
Author: Caideyipi <[email protected]>
AuthorDate: Tue Nov 14 16:23:00 2023 +0800

    Pipe IT: Fix WALFlushListener.testConcurrentGetValue() & Fix smells 
reported by sonar  (#11525)
---
 .../main/java/org/apache/iotdb/CountPointProcessor.java   | 15 +++++++--------
 .../org/apache/iotdb/pipe/it/IoTDBPipeProtocolIT.java     |  2 +-
 .../db/storageengine/dataregion/wal/node/WALNode.java     |  7 +++++--
 .../dataregion/wal/utils/WALEntryPosition.java            |  8 +++++---
 .../dataregion/wal/utils/listener/WALFlushListener.java   |  2 +-
 .../dataregion/wal/node/WALEntryHandlerTest.java          |  9 +++------
 6 files changed, 22 insertions(+), 21 deletions(-)

diff --git 
a/example/pipe-count-point-processor/src/main/java/org/apache/iotdb/CountPointProcessor.java
 
b/example/pipe-count-point-processor/src/main/java/org/apache/iotdb/CountPointProcessor.java
index 331c5e3d997..feeabce0189 100644
--- 
a/example/pipe-count-point-processor/src/main/java/org/apache/iotdb/CountPointProcessor.java
+++ 
b/example/pipe-count-point-processor/src/main/java/org/apache/iotdb/CountPointProcessor.java
@@ -38,12 +38,12 @@ import java.util.concurrent.atomic.AtomicLong;
 
 public class CountPointProcessor implements PipeProcessor {
   private static final String AGGREGATE_SERIES_KEY = "aggregate-series";
-  private static AtomicLong writePointCount = new AtomicLong(0);
+  private static final AtomicLong writePointCount = new AtomicLong(0);
 
   private PartialPath aggregateSeries;
 
   @Override
-  public void validate(PipeParameterValidator validator) throws Exception {
+  public void validate(PipeParameterValidator validator) {
     validator.validateRequiredAttribute(AGGREGATE_SERIES_KEY);
   }
 
@@ -54,12 +54,9 @@ public class CountPointProcessor implements PipeProcessor {
   }
 
   @Override
-  public void process(TabletInsertionEvent tabletInsertionEvent, 
EventCollector eventCollector)
-      throws Exception {
+  public void process(TabletInsertionEvent tabletInsertionEvent, 
EventCollector eventCollector) {
     tabletInsertionEvent.processTablet(
-        (tablet, rowCollector) -> {
-          writePointCount.addAndGet(tablet.rowSize);
-        });
+        (tablet, rowCollector) -> writePointCount.addAndGet(tablet.rowSize));
   }
 
   @Override
@@ -79,5 +76,7 @@ public class CountPointProcessor implements PipeProcessor {
   }
 
   @Override
-  public void close() throws Exception {}
+  public void close() {
+    // Do nothing
+  }
 }
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeProtocolIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeProtocolIT.java
index eebdb073772..1290e46b94a 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeProtocolIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeProtocolIT.java
@@ -43,9 +43,9 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 
+/** Test pipe's basic functionalities under multiple cluster and consensus 
protocol settings. */
 @RunWith(IoTDBTestRunner.class)
 @Category({MultiClusterIT2.class})
-/** Test pipe's basic functionalities under multiple cluster and consensus 
protocol settings. */
 public class IoTDBPipeProtocolIT {
 
   private BaseEnv senderEnv;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALNode.java
index f3b6cd9c69c..7e35f16d107 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALNode.java
@@ -50,6 +50,7 @@ import 
org.apache.iotdb.db.storageengine.dataregion.wal.exception.MemTablePinExc
 import org.apache.iotdb.db.storageengine.dataregion.wal.io.WALByteBufReader;
 import org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALFileStatus;
 import org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALFileUtils;
+import 
org.apache.iotdb.db.storageengine.dataregion.wal.utils.listener.AbstractResultListener;
 import 
org.apache.iotdb.db.storageengine.dataregion.wal.utils.listener.AbstractResultListener.Status;
 import 
org.apache.iotdb.db.storageengine.dataregion.wal.utils.listener.WALFlushListener;
 import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
@@ -233,7 +234,9 @@ public class WALNode implements IWALNode {
 
     private int recursionTime = 0;
 
-    public DeleteOutdatedFileTask() {}
+    public DeleteOutdatedFileTask() {
+      // Do nothing
+    }
 
     private void init() {
       this.firstValidVersionId = initFirstValidWALVersionId();
@@ -928,7 +931,7 @@ public class WALNode implements IWALNode {
   public void rollWALFile() {
     WALEntry rollWALFileSignal = new 
WALSignalEntry(WALEntryType.ROLL_WAL_LOG_WRITER_SIGNAL, true);
     WALFlushListener walFlushListener = log(rollWALFileSignal);
-    if (walFlushListener.waitForResult() == WALFlushListener.Status.FAILURE) {
+    if (walFlushListener.waitForResult() == 
AbstractResultListener.Status.FAILURE) {
       logger.error(
           "Fail to trigger rolling wal node-{}'s wal file log writer.",
           identifier,
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALEntryPosition.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALEntryPosition.java
index 3979c49bffc..4ef1aa9ce6b 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALEntryPosition.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALEntryPosition.java
@@ -47,6 +47,8 @@ public class WALEntryPosition {
   // cache for wal entry
   private WALInsertNodeCache cache = null;
 
+  private static final String ENTRY_NOT_READY_MESSAGE = "This entry isn't 
ready for read.";
+
   public WALEntryPosition() {}
 
   public WALEntryPosition(String identifier, long walFileVersionId, long 
position, int size) {
@@ -71,7 +73,7 @@ public class WALEntryPosition {
    */
   public InsertNode readInsertNodeViaCacheAfterCanRead() throws IOException {
     if (!canRead()) {
-      throw new IOException("This entry isn't ready for read.");
+      throw new IOException(ENTRY_NOT_READY_MESSAGE);
     }
     return cache.getInsertNode(this);
   }
@@ -83,7 +85,7 @@ public class WALEntryPosition {
    */
   public ByteBuffer readByteBufferViaCacheAfterCanRead() throws IOException {
     if (!canRead()) {
-      throw new IOException("This entry isn't ready for read.");
+      throw new IOException(ENTRY_NOT_READY_MESSAGE);
     }
     return cache.getByteBuffer(this);
   }
@@ -144,7 +146,7 @@ public class WALEntryPosition {
   /** Return true only when this wal file is sealed. */
   public boolean isInSealedFile() {
     if (walNode == null || !canRead()) {
-      throw new RuntimeException("This entry isn't ready for read.");
+      throw new RuntimeException(ENTRY_NOT_READY_MESSAGE);
     }
     return walFileVersionId < walNode.getCurrentWALFileVersion();
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/listener/WALFlushListener.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/listener/WALFlushListener.java
index 9c7ba908d1e..8c84a0cb0fe 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/listener/WALFlushListener.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/listener/WALFlushListener.java
@@ -24,7 +24,7 @@ import 
org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALEntryHandler;
 
 /** This class helps judge whether wal is flushed to the storage device. */
 public class WALFlushListener extends AbstractResultListener {
-  // handler for pipeline, only exists then value is InsertNode
+  // handler for pipeline, only exists when value is InsertNode
   private final WALEntryHandler walEntryHandler;
 
   public WALFlushListener(boolean wait, WALEntryValue value) {
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALEntryHandlerTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALEntryHandlerTest.java
index 45f65357347..48d2f60ff67 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALEntryHandlerTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALEntryHandlerTest.java
@@ -42,6 +42,7 @@ import org.apache.iotdb.tsfile.enums.TSDataType;
 import org.apache.iotdb.tsfile.utils.Binary;
 import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
 
+import org.awaitility.Awaitility;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -234,9 +235,7 @@ public class WALEntryHandlerTest {
     handler.pinMemTable();
     walNode1.onMemTableFlushed(memTable);
     // wait until wal flushed
-    while (!walNode1.isAllWALEntriesConsumed()) {
-      Thread.sleep(50);
-    }
+    Awaitility.await().until(() -> walNode1.isAllWALEntriesConsumed());
     assertEquals(node1, handler.getInsertNode());
   }
 
@@ -269,9 +268,7 @@ public class WALEntryHandlerTest {
             }
 
             // wait until wal flushed
-            while (!walNode1.isAllWALEntriesConsumed() && 
!walNode2.isAllWALEntriesConsumed()) {
-              Thread.sleep(50);
-            }
+            Awaitility.await().until(walNode::isAllWALEntriesConsumed);
 
             walFlushListeners.get(0).getWalEntryHandler().pinMemTable();
             walNode.onMemTableFlushed(memTable);

Reply via email to