This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch native_raft
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/native_raft by this push:
new 33aa5068ea2 apply spotless
33aa5068ea2 is described below
commit 33aa5068ea2607a260ec922c52a203f83b731420
Author: Tian Jiang <[email protected]>
AuthorDate: Fri May 19 11:23:59 2023 +0800
apply spotless
---
.../consensus/natraft/protocol/RaftConfig.java | 4 +---
.../consensus/natraft/protocol/RaftMember.java | 3 ++-
.../protocol/log/dispatch/DispatcherThread.java | 10 ++++----
.../protocol/log/dispatch/VotingLogList.java | 2 --
.../protocol/log/logtype/ConfigChangeEntry.java | 6 +++--
.../natraft/protocol/log/logtype/EmptyEntry.java | 6 +++--
.../natraft/protocol/log/logtype/RequestEntry.java | 11 ++++-----
.../manager/DirectorySnapshotRaftLogManager.java | 12 ++++++++--
.../protocol/log/manager/RaftLogManager.java | 11 ++++-----
.../protocol/log/recycle/EntryAllocator.java | 28 ++++++++++++++++++----
10 files changed, 59 insertions(+), 34 deletions(-)
diff --git
a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/RaftConfig.java
b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/RaftConfig.java
index ac9423494ff..ffe0425fc50 100644
---
a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/RaftConfig.java
+++
b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/RaftConfig.java
@@ -700,12 +700,10 @@ public class RaftConfig {
"entry_serialization_buffer_size",
String.valueOf(this.getEntryDefaultSerializationBufferSize()))));
-
this.setEntryAllocatorCapacity(
Integer.parseInt(
properties.getProperty(
- "entry_allocator_capacity",
- String.valueOf(this.getEntryAllocatorCapacity()))));
+ "entry_allocator_capacity",
String.valueOf(this.getEntryAllocatorCapacity()))));
String consistencyLevel = properties.getProperty("consistency_level");
if (consistencyLevel != null) {
diff --git
a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/RaftMember.java
b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/RaftMember.java
index 2a395a3f45e..b460fbc5a10 100644
---
a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/RaftMember.java
+++
b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/RaftMember.java
@@ -644,7 +644,8 @@ public class RaftMember {
}
logger.debug("{}: Processing request {}", name, request);
- Entry entry = requestEntryAllocator.Allocate();
+ RequestEntry entry = requestEntryAllocator.Allocate();
+ entry.setRequest(request);
entry.preSerialize();
entry.receiveTime = System.nanoTime();
diff --git
a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/DispatcherThread.java
b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/DispatcherThread.java
index 3fdb6e1059d..60746935feb 100644
---
a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/DispatcherThread.java
+++
b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/DispatcherThread.java
@@ -124,9 +124,8 @@ class DispatcherThread extends DynamicThread {
protected void serializeEntries() throws InterruptedException {
for (VotingEntry request : currBatch) {
-
- request.getAppendEntryRequest().entry = request.getEntry().serialize();
-
request.getEntry().setByteSize(request.getAppendEntryRequest().entry.limit());
+ ByteBuffer serialized = request.getEntry().serialize();
+ request.getEntry().setByteSize(serialized.remaining());
}
}
@@ -228,12 +227,13 @@ class DispatcherThread extends DynamicThread {
for (; logIndex < currBatch.size(); logIndex++) {
VotingEntry entry = currBatch.get(logIndex);
- long curSize = entry.getAppendEntryRequest().entry.remaining();
+ ByteBuffer serialized = entry.getEntry().serialize();
+ long curSize = serialized.remaining();
if (logSizeLimit - curSize - logSize <=
IoTDBConstant.LEFT_SIZE_IN_REQUEST) {
break;
}
logSize += curSize;
- logList.add(entry.getAppendEntryRequest().entry);
+ logList.add(serialized);
Statistic.LOG_DISPATCHER_FROM_CREATE_TO_SENDING.calOperationCostTimeFromStart(
entry.getEntry().createTime);
}
diff --git
a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/VotingLogList.java
b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/VotingLogList.java
index 527a2ec0509..ba06f33c7be 100644
---
a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/VotingLogList.java
+++
b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/VotingLogList.java
@@ -19,8 +19,6 @@
package org.apache.iotdb.consensus.natraft.protocol.log.dispatch;
-import java.util.ArrayList;
-import java.util.List;
import org.apache.iotdb.consensus.common.Peer;
import org.apache.iotdb.consensus.natraft.exception.LogExecutionException;
import org.apache.iotdb.consensus.natraft.protocol.RaftMember;
diff --git
a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/logtype/ConfigChangeEntry.java
b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/logtype/ConfigChangeEntry.java
index 26ed266509c..abf957d48b8 100644
---
a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/logtype/ConfigChangeEntry.java
+++
b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/logtype/ConfigChangeEntry.java
@@ -21,6 +21,7 @@ package
org.apache.iotdb.consensus.natraft.protocol.log.logtype;
import org.apache.iotdb.consensus.common.Peer;
import org.apache.iotdb.consensus.natraft.protocol.log.Entry;
+import org.apache.iotdb.tsfile.utils.PublicBAOS;
import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
@@ -28,7 +29,6 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
-import org.apache.iotdb.tsfile.utils.PublicBAOS;
public class ConfigChangeEntry extends Entry {
@@ -43,7 +43,9 @@ public class ConfigChangeEntry extends Entry {
@Override
protected ByteBuffer serializeInternal(byte[] buffer) {
ByteArrayOutputStream byteArrayOutputStream =
- buffer == null ? new PublicBAOS(getDefaultSerializationBufferSize()) :
new PublicBAOS(buffer);
+ buffer == null
+ ? new PublicBAOS(getDefaultSerializationBufferSize())
+ : new PublicBAOS(buffer);
try (DataOutputStream dataOutputStream = new
DataOutputStream(byteArrayOutputStream)) {
dataOutputStream.writeByte((byte) Types.EMPTY.ordinal());
dataOutputStream.writeLong(getCurrLogIndex());
diff --git
a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/logtype/EmptyEntry.java
b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/logtype/EmptyEntry.java
index 231a83b4e0c..0549080b67f 100644
---
a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/logtype/EmptyEntry.java
+++
b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/logtype/EmptyEntry.java
@@ -20,12 +20,12 @@
package org.apache.iotdb.consensus.natraft.protocol.log.logtype;
import org.apache.iotdb.consensus.natraft.protocol.log.Entry;
+import org.apache.iotdb.tsfile.utils.PublicBAOS;
import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
-import org.apache.iotdb.tsfile.utils.PublicBAOS;
public class EmptyEntry extends Entry {
@@ -39,7 +39,9 @@ public class EmptyEntry extends Entry {
@Override
protected ByteBuffer serializeInternal(byte[] buffer) {
ByteArrayOutputStream byteArrayOutputStream =
- buffer == null ? new PublicBAOS(getDefaultSerializationBufferSize()) :
new PublicBAOS(buffer);
+ buffer == null
+ ? new PublicBAOS(getDefaultSerializationBufferSize())
+ : new PublicBAOS(buffer);
try (DataOutputStream dataOutputStream = new
DataOutputStream(byteArrayOutputStream)) {
dataOutputStream.writeByte((byte) Types.EMPTY.ordinal());
dataOutputStream.writeLong(getCurrLogIndex());
diff --git
a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/logtype/RequestEntry.java
b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/logtype/RequestEntry.java
index f6cb9ec3707..003f9d8a2a6 100644
---
a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/logtype/RequestEntry.java
+++
b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/logtype/RequestEntry.java
@@ -34,16 +34,13 @@ import java.util.Objects;
import static
org.apache.iotdb.consensus.natraft.protocol.log.Entry.Types.CLIENT_REQUEST;
-/**
- * RequestLog contains a non-partitioned request like set storage group.
- */
+/** RequestLog contains a non-partitioned request like set storage group. */
public class RequestEntry extends Entry {
private static final Logger logger =
LoggerFactory.getLogger(RequestEntry.class);
private volatile IConsensusRequest request;
- public RequestEntry() {
- }
+ public RequestEntry() {}
public RequestEntry(IConsensusRequest request) {
setRequest(request);
@@ -52,7 +49,9 @@ public class RequestEntry extends Entry {
@Override
protected ByteBuffer serializeInternal(byte[] buffer) {
PublicBAOS byteArrayOutputStream =
- buffer == null ? new PublicBAOS(getDefaultSerializationBufferSize()) :
new PublicBAOS(buffer);
+ buffer == null
+ ? new PublicBAOS(getDefaultSerializationBufferSize())
+ : new PublicBAOS(buffer);
int requestSize = 0;
int requestPos = 0;
try (DataOutputStream dataOutputStream = new
DataOutputStream(byteArrayOutputStream)) {
diff --git
a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/DirectorySnapshotRaftLogManager.java
b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/DirectorySnapshotRaftLogManager.java
index 2cd4e39203a..3c54da1309e 100644
---
a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/DirectorySnapshotRaftLogManager.java
+++
b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/DirectorySnapshotRaftLogManager.java
@@ -19,7 +19,6 @@
package org.apache.iotdb.consensus.natraft.protocol.log.manager;
-import java.util.function.Supplier;
import org.apache.iotdb.consensus.IStateMachine;
import org.apache.iotdb.consensus.common.Peer;
import org.apache.iotdb.consensus.natraft.protocol.RaftConfig;
@@ -34,6 +33,7 @@ import java.io.File;
import java.nio.file.Path;
import java.util.List;
import java.util.function.Consumer;
+import java.util.function.Supplier;
public class DirectorySnapshotRaftLogManager extends RaftLogManager {
@@ -51,7 +51,15 @@ public class DirectorySnapshotRaftLogManager extends
RaftLogManager {
Consumer<List<Entry>> unappliedEntryExaminer,
Supplier<Long> safeIndexProvider,
Consumer<Entry> entryRecycler) {
- super(stableEntryManager, applier, name, stateMachine, config,
unappliedEntryExaminer, safeIndexProvider, entryRecycler);
+ super(
+ stableEntryManager,
+ applier,
+ name,
+ stateMachine,
+ config,
+ unappliedEntryExaminer,
+ safeIndexProvider,
+ entryRecycler);
}
@Override
diff --git
a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/RaftLogManager.java
b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/RaftLogManager.java
index 0c5fc6e936a..aa67fc7641f 100644
---
a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/RaftLogManager.java
+++
b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/RaftLogManager.java
@@ -19,7 +19,6 @@
package org.apache.iotdb.consensus.natraft.protocol.log.manager;
-import java.util.function.Supplier;
import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
import org.apache.iotdb.consensus.IStateMachine;
@@ -50,6 +49,7 @@ import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Consumer;
+import java.util.function.Supplier;
public abstract class RaftLogManager {
@@ -117,7 +117,8 @@ public abstract class RaftLogManager {
String name,
IStateMachine stateMachine,
RaftConfig config,
- Consumer<List<Entry>> unappliedEntryExaminer, Supplier<Long>
safeIndexProvider,
+ Consumer<List<Entry>> unappliedEntryExaminer,
+ Supplier<Long> safeIndexProvider,
Consumer<Entry> entryRecycler) {
this.logApplier = applier;
this.name = name;
@@ -208,14 +209,10 @@ public abstract class RaftLogManager {
* applied before take snapshot
*
* <p>
- *
*/
public abstract void takeSnapshot(RaftMember member);
- /**
- * Update the raftNode's hardState(currentTerm,voteFor) and flush to disk.
- *
- */
+ /** Update the raftNode's hardState(currentTerm,voteFor) and flush to disk.
*/
public void updateHardState(HardState state) {
getStableEntryManager().setHardStateAndFlush(state);
}
diff --git
a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/recycle/EntryAllocator.java
b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/recycle/EntryAllocator.java
index a603624d764..55795bc18a0 100644
---
a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/recycle/EntryAllocator.java
+++
b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/recycle/EntryAllocator.java
@@ -1,10 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
package org.apache.iotdb.consensus.natraft.protocol.log.recycle;
+import org.apache.iotdb.consensus.natraft.protocol.RaftConfig;
+import org.apache.iotdb.consensus.natraft.protocol.log.Entry;
+
import java.util.Queue;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.function.Supplier;
-import org.apache.iotdb.consensus.natraft.protocol.RaftConfig;
-import org.apache.iotdb.consensus.natraft.protocol.log.Entry;
public class EntryAllocator<T extends Entry> {
private Queue<T> entryPool;
@@ -12,7 +32,8 @@ public class EntryAllocator<T extends Entry> {
private Queue<T> recyclingEntries;
private Supplier<Long> safeIndexProvider;
- public EntryAllocator(RaftConfig config, Supplier<T> entryFactory,
Supplier<Long> safeIndexProvider) {
+ public EntryAllocator(
+ RaftConfig config, Supplier<T> entryFactory, Supplier<Long>
safeIndexProvider) {
this.entryPool = new
ArrayBlockingQueue<>(config.getEntryAllocatorCapacity());
this.recyclingEntries = new
ArrayBlockingQueue<>(config.getEntryAllocatorCapacity() / 2);
this.entryFactory = entryFactory;
@@ -54,5 +75,4 @@ public class EntryAllocator<T extends Entry> {
}
}
}
-
}