This is an automated email from the ASF dual-hosted git repository. caogaofei pushed a commit to branch geely_car_0205_confignode in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit c74e3e2b18c0ce3096f241889c9e3d8e19f1bc9f Author: Beyyes <cgf1...@foxmail.com> AuthorDate: Sun Feb 5 16:23:32 2023 +0800 fix conflict --- .../request/write/SimpleConsensusLogWriter.java | 3 + .../statemachine/ConfigNodeRegionStateMachine.java | 20 ++--- .../iotdb/confignode/manager/ConfigManager.java | 90 +++------------------- 3 files changed, 19 insertions(+), 94 deletions(-) diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/SimpleConsensusLogWriter.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/SimpleConsensusLogWriter.java new file mode 100644 index 0000000000..313f71ef92 --- /dev/null +++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/SimpleConsensusLogWriter.java @@ -0,0 +1,3 @@ +package org.apache.iotdb.confignode.consensus.request.write; + +public class SimpleConsensusLogWriter {} diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigNodeRegionStateMachine.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigNodeRegionStateMachine.java index 16d75de30b..72e68c1cfe 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigNodeRegionStateMachine.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigNodeRegionStateMachine.java @@ -51,6 +51,7 @@ import java.nio.file.Files; import java.nio.file.StandardCopyOption; import java.util.Optional; import java.util.concurrent.ExecutorService; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; /** StateMachine for ConfigNodeRegion */ @@ -68,6 +69,7 @@ public class ConfigNodeRegionStateMachine private File logFile; private int startIndex; private int endIndex; + private ScheduledExecutorService executorService; private static final String CURRENT_FILE_DIR = CONF.getConsensusDir() + File.separator + "simple" + File.separator + "current"; @@ -302,15 +304,10 @@ public class ConfigNodeRegionStateMachine try { ByteBuffer buffer = plan.serializeToByteBuffer(); - // The method logWriter.write will execute flip() firstly, so we must make position==limit buffer.position(buffer.limit()); logWriter.write(buffer); - logWriter.close(); + endIndex = endIndex + 1; - File tmpLogFile = new File(PROGRESS_FILE_PATH + endIndex); - Files.move(logFile.toPath(), tmpLogFile.toPath(), StandardCopyOption.ATOMIC_MOVE); - logFile = tmpLogFile; - logWriter = new LogWriter(logFile, false); } catch (Exception e) { LOGGER.error( "Can't serialize current ConfigPhysicalPlan for ConfigNode SimpleConsensus mode", e); @@ -323,14 +320,6 @@ public class ConfigNodeRegionStateMachine String[] list = new File(CURRENT_FILE_DIR).list(); if (list != null && list.length != 0) { for (String logFileName : list) { - int tmp = Integer.parseInt(logFileName.substring(logFileName.lastIndexOf("_") + 1)); - if (logFileName.startsWith("log_inprogress")) { - endIndex = tmp; - } else { - if (startIndex < tmp) { - startIndex = tmp; - } - } File logFile = SystemFileFactory.INSTANCE.getFile(CURRENT_FILE_DIR + File.separator + logFileName); SingleFileLogReader logReader; @@ -343,7 +332,10 @@ public class ConfigNodeRegionStateMachine e); continue; } + + startIndex = endIndex; while (logReader.hasNext()) { + endIndex++; // read and re-serialize the PhysicalPlan ConfigPhysicalPlan nextPlan = logReader.next(); try { diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java index 3432916141..1d4c57716a 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java @@ -217,6 +217,10 @@ public class ConfigManager implements IManager { private final RetryFailedTasksThread retryFailedTasksThread; + private static int schemaPartitionCount = 0; + + private static int dataPartitionCount = 0; + public ConfigManager() throws IOException { // Build the persistence module NodeInfo nodeInfo = new NodeInfo(); @@ -650,40 +654,10 @@ public class ConfigManager implements IManager { partitionManager.getOrCreateSchemaPartition(getOrCreateSchemaPartitionPlan); resp = queryResult.convertToRpcSchemaPartitionTableResp(); - StringBuilder devicePathString = new StringBuilder("{"); - for (String devicePath : devicePaths) { - devicePathString.append("\n\t").append(devicePath).append(","); - } - devicePathString.append("\n}"); - - StringBuilder schemaPartitionRespString = new StringBuilder("{"); - schemaPartitionRespString - .append("\n\tTSStatus=") - .append(resp.getStatus().getCode()) - .append(","); - Map<String, Map<TSeriesPartitionSlot, TConsensusGroupId>> schemaPartitionTable = - resp.getSchemaPartitionTable(); - for (Map.Entry<String, Map<TSeriesPartitionSlot, TConsensusGroupId>> databaseEntry : - schemaPartitionTable.entrySet()) { - String database = databaseEntry.getKey(); - schemaPartitionRespString.append("\n\tDatabase=").append(database).append(": {"); - for (Map.Entry<TSeriesPartitionSlot, TConsensusGroupId> slotEntry : - databaseEntry.getValue().entrySet()) { - schemaPartitionRespString - .append("\n\t\t") - .append(slotEntry.getKey()) - .append(", ") - .append(slotEntry.getValue()) - .append(","); - } - schemaPartitionRespString.append("\n\t},"); + schemaPartitionCount += 1; + if (schemaPartitionCount % 100 == 0) { + LOGGER.info("schemaPartition req count: {}", schemaPartitionCount); } - schemaPartitionRespString.append("\n}"); - - LOGGER.info( - "[GetOrCreateSchemaPartition]:\nReceive PathPatternTree: {}, Return TSchemaPartitionTableResp: {}", - devicePathString, - schemaPartitionRespString); return resp; } @@ -753,54 +727,10 @@ public class ConfigManager implements IManager { resp = queryResult.convertToTDataPartitionTableResp(); - StringBuilder partitionSlotsMapString = new StringBuilder("{"); - for (Map.Entry<String, Map<TSeriesPartitionSlot, TTimeSlotList>> databaseEntry : - getOrCreateDataPartitionReq.getPartitionSlotsMap().entrySet()) { - String database = databaseEntry.getKey(); - partitionSlotsMapString.append("\n\tDatabase=").append(database).append(": {"); - for (Map.Entry<TSeriesPartitionSlot, TTimeSlotList> slotEntry : - databaseEntry.getValue().entrySet()) { - partitionSlotsMapString - .append("\n\t\t") - .append(slotEntry.getKey()) - .append(",") - .append(slotEntry.getValue()); - } - partitionSlotsMapString.append("\n\t},"); + dataPartitionCount++; + if (dataPartitionCount % 100 == 0) { + LOGGER.info("dataPartition req count: {}", dataPartitionCount); } - partitionSlotsMapString.append("\n}"); - - StringBuilder dataPartitionRespString = new StringBuilder("{"); - dataPartitionRespString.append("\n\tTSStatus=").append(resp.getStatus().getCode()).append(","); - Map<String, Map<TSeriesPartitionSlot, Map<TTimePartitionSlot, List<TConsensusGroupId>>>> - dataPartitionTable = resp.getDataPartitionTable(); - for (Map.Entry< - String, Map<TSeriesPartitionSlot, Map<TTimePartitionSlot, List<TConsensusGroupId>>>> - databaseEntry : dataPartitionTable.entrySet()) { - String database = databaseEntry.getKey(); - dataPartitionRespString.append("\n\tDatabase=").append(database).append(": {"); - for (Map.Entry<TSeriesPartitionSlot, Map<TTimePartitionSlot, List<TConsensusGroupId>>> - seriesSlotEntry : databaseEntry.getValue().entrySet()) { - dataPartitionRespString.append("\n\t\t").append(seriesSlotEntry.getKey()).append(": {"); - for (Map.Entry<TTimePartitionSlot, List<TConsensusGroupId>> timeSlotEntry : - seriesSlotEntry.getValue().entrySet()) { - dataPartitionRespString - .append("\n\t\t\t") - .append(timeSlotEntry.getKey()) - .append(", ") - .append(timeSlotEntry.getValue()) - .append(","); - } - dataPartitionRespString.append("\n\t\t},"); - } - dataPartitionRespString.append("\n\t}"); - } - dataPartitionRespString.append("\n}"); - - LOGGER.info( - "[GetOrCreateDataPartition]:\nReceive PartitionSlotsMap: {}, Return TDataPartitionTableResp: {}", - partitionSlotsMapString, - dataPartitionRespString); return resp; }