This is an automated email from the ASF dual-hosted git repository.
tanxinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new cf0d3dd93e3 Fix Concurrency Instability When IoTConsensus
LogDispatcher Exits #12655
cf0d3dd93e3 is described below
commit cf0d3dd93e376476515745be68e90c1c722ed126
Author: Xiangpeng Hu <[email protected]>
AuthorDate: Tue Jun 4 21:00:14 2024 +0800
Fix Concurrency Instability When IoTConsensus LogDispatcher Exits #12655
---
.../consensus/iot/logdispatcher/LogDispatcher.java | 34 +++++-------
.../apache/iotdb/consensus/iot/ReplicateTest.java | 62 ++++------------------
2 files changed, 21 insertions(+), 75 deletions(-)
diff --git
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java
index d7424f136d7..f5de226cc41 100644
---
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java
+++
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java
@@ -46,12 +46,9 @@ import java.util.Objects;
import java.util.OptionalLong;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.CancellationException;
-import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
+import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
@@ -101,7 +98,7 @@ public class LogDispatcher {
public synchronized void start() {
if (!threads.isEmpty()) {
- threads.forEach(thread ->
thread.setFuture(executorService.submit(thread)));
+ threads.forEach(executorService::submit);
}
}
@@ -133,7 +130,7 @@ public class LogDispatcher {
if (this.executorService == null) {
initLogSyncThreadPool();
}
- thread.setFuture(executorService.submit(thread));
+ executorService.submit(thread);
}
public synchronized void removeLogDispatcherThread(Peer peer) throws
IOException {
@@ -231,7 +228,7 @@ public class LogDispatcher {
private final LogDispatcherThreadMetrics logDispatcherThreadMetrics;
- private Future<?> future;
+ private Semaphore threadSemaphore = new Semaphore(0);
public LogDispatcherThread(Peer peer, IoTConsensusConfig config, long
initialSyncIndex) {
this.peer = peer;
@@ -257,10 +254,6 @@ public class LogDispatcher {
return controller.getCurrentIndex();
}
- public void setFuture(Future<?> future) {
- this.future = future;
- }
-
public long getLastFlushedSyncIndex() {
return controller.getLastFlushedIndex();
}
@@ -308,16 +301,12 @@ public class LogDispatcher {
public void stop() {
stopped = true;
- if (!future.cancel(true)) {
- logger.warn("LogDispatcherThread Future for {} is not stopped", peer);
- }
try {
- future.get(30, TimeUnit.SECONDS);
- } catch (InterruptedException | ExecutionException | TimeoutException e)
{
+ if (!threadSemaphore.tryAcquire(30, TimeUnit.SECONDS)) {
+ logger.error("{}: Dispatcher for {} didn't stop after 30s.",
impl.getThisNode(), peer);
+ }
+ } catch (InterruptedException e) {
Thread.currentThread().interrupt();
- logger.warn("LogDispatcherThread Future for {} is not stopped", peer,
e);
- } catch (CancellationException ignored) {
- // ignore because it is expected
}
long requestSize = 0;
for (IndexedConsensusRequest indexedConsensusRequest : pendingEntries) {
@@ -351,7 +340,7 @@ public class LogDispatcher {
logger.info("{}: Dispatcher for {} starts", impl.getThisNode(), peer);
try {
Batch batch;
- while (!Thread.interrupted()) {
+ while (!Thread.interrupted() && !stopped) {
long startTime = System.nanoTime();
while ((batch = getBatch()).isEmpty()) {
// we may block here if there is no requests in the queue
@@ -366,12 +355,12 @@ public class LogDispatcher {
}
}
// Immediately check for interrupts after poll and sleep
- if (Thread.interrupted()) {
+ if (Thread.interrupted() || stopped) {
throw new InterruptedException("Interrupted after polling and
sleeping");
}
}
// Immediately check for interrupts after a time-consuming
getBatch() operation
- if (Thread.interrupted()) {
+ if (Thread.interrupted() || stopped) {
throw new InterruptedException("Interrupted after getting a
batch");
}
logDispatcherThreadMetrics.recordConstructBatchTime(System.nanoTime() -
startTime);
@@ -388,6 +377,7 @@ public class LogDispatcher {
} catch (Exception e) {
logger.error("Unexpected error in logDispatcher for peer {}", peer, e);
}
+ threadSemaphore.release();
logger.info("{}: Dispatcher for {} exits", impl.getThisNode(), peer);
}
diff --git
a/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/iot/ReplicateTest.java
b/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/iot/ReplicateTest.java
index 610692a73e1..6e2326dd546 100644
---
a/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/iot/ReplicateTest.java
+++
b/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/iot/ReplicateTest.java
@@ -33,21 +33,17 @@ import org.apache.iotdb.consensus.iot.util.TestEntry;
import org.apache.iotdb.consensus.iot.util.TestStateMachine;
import org.apache.ratis.util.FileUtils;
-import org.apache.tsfile.utils.PublicBAOS;
import org.junit.After;
import org.junit.Assert;
+import org.junit.Assume;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.DataOutputStream;
import java.io.File;
import java.io.IOException;
import java.net.ServerSocket;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
@@ -102,42 +98,10 @@ public class ReplicateTest {
}
}
- public void changeConfiguration(int i) {
- try (PublicBAOS publicBAOS = new PublicBAOS();
- DataOutputStream outputStream = new DataOutputStream(publicBAOS)) {
- outputStream.writeInt(this.peers.size());
- for (Peer peer : this.peers) {
- peer.serialize(outputStream);
- }
- File storageDir = new
File(IoTConsensus.buildPeerDir(peersStorage.get(i), gid));
- Path tmpConfigurationPath =
- Paths.get(new File(storageDir,
CONFIGURATION_TMP_FILE_NAME).getAbsolutePath());
- Path configurationPath =
- Paths.get(new File(storageDir,
CONFIGURATION_FILE_NAME).getAbsolutePath());
- if (!Files.exists(configurationPath) &&
!Files.exists(tmpConfigurationPath)) {
- return;
- }
- if (!Files.exists(tmpConfigurationPath)) {
- Files.createDirectories(tmpConfigurationPath.getParent());
- Files.createFile(tmpConfigurationPath);
- }
- Files.write(tmpConfigurationPath, publicBAOS.getBuf());
- if (Files.exists(configurationPath)) {
- Files.delete(configurationPath);
- }
- Files.move(tmpConfigurationPath, configurationPath);
- } catch (IOException e) {
- logger.error("Unexpected error occurs when persisting configuration", e);
- }
- }
-
private void initServer() throws IOException {
- for (int i = 0; i < peers.size(); i++) {
- findPortAvailable(i);
- }
+ Assume.assumeTrue(checkPortAvailable());
for (int i = 0; i < peers.size(); i++) {
int finalI = i;
- changeConfiguration(i);
servers.add(
(IoTConsensus)
ConsensusFactory.getConsensusImpl(
@@ -363,23 +327,15 @@ public class ReplicateTest {
}
}
- private void findPortAvailable(int i) {
- long start = System.currentTimeMillis();
- while (System.currentTimeMillis() - start < timeout) {
- try (ServerSocket ignored = new
ServerSocket(this.peers.get(i).getEndpoint().port)) {
- // success
- return;
+ private boolean checkPortAvailable() {
+ for (Peer peer : this.peers) {
+ try (ServerSocket ignored = new ServerSocket(peer.getEndpoint().port)) {
+ logger.info("check port {} success for node {}",
peer.getEndpoint().port, peer.getNodeId());
} catch (IOException e) {
- // Port is already in use, wait and retry
- this.peers.set(i, new Peer(gid, i + 1, new TEndPoint("127.0.0.1",
this.basePort)));
- logger.info("try port {} for node {}.", this.basePort++, i + 1);
- try {
- Thread.sleep(50); // Wait for 1 second before retrying
- } catch (InterruptedException ex) {
- // Handle the interruption if needed
- }
+ logger.error("check port {} failed for node {}",
peer.getEndpoint().port, peer.getNodeId());
+ return false;
}
}
- Assert.fail(String.format("can not find port for node %d after 300s", i +
1));
+ return true;
}
}