This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 6bb3d93be13 Pipe IT: Fix WALFlushListener.testConcurrentGetValue() &
Fix smells reported by sonar (#11525)
6bb3d93be13 is described below
commit 6bb3d93be13f11fdc0cdcafac21045a03c806b1f
Author: Caideyipi <[email protected]>
AuthorDate: Tue Nov 14 16:23:00 2023 +0800
Pipe IT: Fix WALFlushListener.testConcurrentGetValue() & Fix smells
reported by sonar (#11525)
---
.../main/java/org/apache/iotdb/CountPointProcessor.java | 15 +++++++--------
.../org/apache/iotdb/pipe/it/IoTDBPipeProtocolIT.java | 2 +-
.../db/storageengine/dataregion/wal/node/WALNode.java | 7 +++++--
.../dataregion/wal/utils/WALEntryPosition.java | 8 +++++---
.../dataregion/wal/utils/listener/WALFlushListener.java | 2 +-
.../dataregion/wal/node/WALEntryHandlerTest.java | 9 +++------
6 files changed, 22 insertions(+), 21 deletions(-)
diff --git
a/example/pipe-count-point-processor/src/main/java/org/apache/iotdb/CountPointProcessor.java
b/example/pipe-count-point-processor/src/main/java/org/apache/iotdb/CountPointProcessor.java
index 331c5e3d997..feeabce0189 100644
---
a/example/pipe-count-point-processor/src/main/java/org/apache/iotdb/CountPointProcessor.java
+++
b/example/pipe-count-point-processor/src/main/java/org/apache/iotdb/CountPointProcessor.java
@@ -38,12 +38,12 @@ import java.util.concurrent.atomic.AtomicLong;
public class CountPointProcessor implements PipeProcessor {
private static final String AGGREGATE_SERIES_KEY = "aggregate-series";
- private static AtomicLong writePointCount = new AtomicLong(0);
+ private static final AtomicLong writePointCount = new AtomicLong(0);
private PartialPath aggregateSeries;
@Override
- public void validate(PipeParameterValidator validator) throws Exception {
+ public void validate(PipeParameterValidator validator) {
validator.validateRequiredAttribute(AGGREGATE_SERIES_KEY);
}
@@ -54,12 +54,9 @@ public class CountPointProcessor implements PipeProcessor {
}
@Override
- public void process(TabletInsertionEvent tabletInsertionEvent,
EventCollector eventCollector)
- throws Exception {
+ public void process(TabletInsertionEvent tabletInsertionEvent,
EventCollector eventCollector) {
tabletInsertionEvent.processTablet(
- (tablet, rowCollector) -> {
- writePointCount.addAndGet(tablet.rowSize);
- });
+ (tablet, rowCollector) -> writePointCount.addAndGet(tablet.rowSize));
}
@Override
@@ -79,5 +76,7 @@ public class CountPointProcessor implements PipeProcessor {
}
@Override
- public void close() throws Exception {}
+ public void close() {
+ // Do nothing
+ }
}
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeProtocolIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeProtocolIT.java
index eebdb073772..1290e46b94a 100644
---
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeProtocolIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeProtocolIT.java
@@ -43,9 +43,9 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
+/** Test pipe's basic functionalities under multiple cluster and consensus
protocol settings. */
@RunWith(IoTDBTestRunner.class)
@Category({MultiClusterIT2.class})
-/** Test pipe's basic functionalities under multiple cluster and consensus
protocol settings. */
public class IoTDBPipeProtocolIT {
private BaseEnv senderEnv;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALNode.java
index f3b6cd9c69c..7e35f16d107 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALNode.java
@@ -50,6 +50,7 @@ import
org.apache.iotdb.db.storageengine.dataregion.wal.exception.MemTablePinExc
import org.apache.iotdb.db.storageengine.dataregion.wal.io.WALByteBufReader;
import org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALFileStatus;
import org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALFileUtils;
+import
org.apache.iotdb.db.storageengine.dataregion.wal.utils.listener.AbstractResultListener;
import
org.apache.iotdb.db.storageengine.dataregion.wal.utils.listener.AbstractResultListener.Status;
import
org.apache.iotdb.db.storageengine.dataregion.wal.utils.listener.WALFlushListener;
import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
@@ -233,7 +234,9 @@ public class WALNode implements IWALNode {
private int recursionTime = 0;
- public DeleteOutdatedFileTask() {}
+ public DeleteOutdatedFileTask() {
+ // Do nothing
+ }
private void init() {
this.firstValidVersionId = initFirstValidWALVersionId();
@@ -928,7 +931,7 @@ public class WALNode implements IWALNode {
public void rollWALFile() {
WALEntry rollWALFileSignal = new
WALSignalEntry(WALEntryType.ROLL_WAL_LOG_WRITER_SIGNAL, true);
WALFlushListener walFlushListener = log(rollWALFileSignal);
- if (walFlushListener.waitForResult() == WALFlushListener.Status.FAILURE) {
+ if (walFlushListener.waitForResult() ==
AbstractResultListener.Status.FAILURE) {
logger.error(
"Fail to trigger rolling wal node-{}'s wal file log writer.",
identifier,
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALEntryPosition.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALEntryPosition.java
index 3979c49bffc..4ef1aa9ce6b 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALEntryPosition.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALEntryPosition.java
@@ -47,6 +47,8 @@ public class WALEntryPosition {
// cache for wal entry
private WALInsertNodeCache cache = null;
+ private static final String ENTRY_NOT_READY_MESSAGE = "This entry isn't
ready for read.";
+
public WALEntryPosition() {}
public WALEntryPosition(String identifier, long walFileVersionId, long
position, int size) {
@@ -71,7 +73,7 @@ public class WALEntryPosition {
*/
public InsertNode readInsertNodeViaCacheAfterCanRead() throws IOException {
if (!canRead()) {
- throw new IOException("This entry isn't ready for read.");
+ throw new IOException(ENTRY_NOT_READY_MESSAGE);
}
return cache.getInsertNode(this);
}
@@ -83,7 +85,7 @@ public class WALEntryPosition {
*/
public ByteBuffer readByteBufferViaCacheAfterCanRead() throws IOException {
if (!canRead()) {
- throw new IOException("This entry isn't ready for read.");
+ throw new IOException(ENTRY_NOT_READY_MESSAGE);
}
return cache.getByteBuffer(this);
}
@@ -144,7 +146,7 @@ public class WALEntryPosition {
/** Return true only when this wal file is sealed. */
public boolean isInSealedFile() {
if (walNode == null || !canRead()) {
- throw new RuntimeException("This entry isn't ready for read.");
+ throw new RuntimeException(ENTRY_NOT_READY_MESSAGE);
}
return walFileVersionId < walNode.getCurrentWALFileVersion();
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/listener/WALFlushListener.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/listener/WALFlushListener.java
index 9c7ba908d1e..8c84a0cb0fe 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/listener/WALFlushListener.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/listener/WALFlushListener.java
@@ -24,7 +24,7 @@ import
org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALEntryHandler;
/** This class helps judge whether wal is flushed to the storage device. */
public class WALFlushListener extends AbstractResultListener {
- // handler for pipeline, only exists then value is InsertNode
+ // handler for pipeline, only exists when value is InsertNode
private final WALEntryHandler walEntryHandler;
public WALFlushListener(boolean wait, WALEntryValue value) {
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALEntryHandlerTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALEntryHandlerTest.java
index 45f65357347..48d2f60ff67 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALEntryHandlerTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALEntryHandlerTest.java
@@ -42,6 +42,7 @@ import org.apache.iotdb.tsfile.enums.TSDataType;
import org.apache.iotdb.tsfile.utils.Binary;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+import org.awaitility.Awaitility;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -234,9 +235,7 @@ public class WALEntryHandlerTest {
handler.pinMemTable();
walNode1.onMemTableFlushed(memTable);
// wait until wal flushed
- while (!walNode1.isAllWALEntriesConsumed()) {
- Thread.sleep(50);
- }
+ Awaitility.await().until(() -> walNode1.isAllWALEntriesConsumed());
assertEquals(node1, handler.getInsertNode());
}
@@ -269,9 +268,7 @@ public class WALEntryHandlerTest {
}
// wait until wal flushed
- while (!walNode1.isAllWALEntriesConsumed() &&
!walNode2.isAllWALEntriesConsumed()) {
- Thread.sleep(50);
- }
+ Awaitility.await().until(walNode::isAllWALEntriesConsumed);
walFlushListeners.get(0).getWalEntryHandler().pinMemTable();
walNode.onMemTableFlushed(memTable);