This is an automated email from the ASF dual-hosted git repository. lta pushed a commit to branch fix_add_duplicated_metadata_bug in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
commit da4ba74377c850230b28a0cbf59d3526ebbb48fb Author: lta <[email protected]> AuthorDate: Tue Mar 17 19:58:51 2020 +0800 fix bug of add duplicated bugs --- .../engine/storagegroup/StorageGroupProcessor.java | 17 +++++++++++++--- .../org/apache/iotdb/db/metadata/MManager.java | 4 ++++ .../java/org/apache/iotdb/db/metadata/MTree.java | 3 +++ .../db/sync/receiver/transfer/SyncServiceImpl.java | 16 ++++++++++++++- .../db/sync/sender/manage/SyncFileManager.java | 10 ---------- .../iotdb/db/sync/sender/transfer/SyncClient.java | 9 +++++++-- .../org/apache/iotdb/db/utils/FileLoaderUtils.java | 1 + service-rpc/src/main/thrift/sync.thrift | 23 +++++++++++++++++----- 8 files changed, 62 insertions(+), 21 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java index 674c0fd..6642ae1 100755 --- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java @@ -1537,8 +1537,9 @@ public class StorageGroupProcessor { writeLock(); mergeLock.writeLock().lock(); try { - loadTsFileByType(LoadTsFileType.LOAD_SEQUENCE, tsfileToBeInserted, newTsFileResource); - updateLatestTimeMap(newTsFileResource); + if (loadTsFileByType(LoadTsFileType.LOAD_SEQUENCE, tsfileToBeInserted, newTsFileResource)){ + updateLatestTimeMap(newTsFileResource); + } } catch (DiskSpaceInsufficientException e) { logger.error( "Failed to append the tsfile {} to storage group processor {} because the disk space is insufficient.", @@ -1780,8 +1781,9 @@ public class StorageGroupProcessor { * @param type load type * @param tsFileResource tsfile resource to be loaded * @UsedBy sync module, load external tsfile module. + * @return load the file successfully */ - private void loadTsFileByType(LoadTsFileType type, File syncedTsFile, + private boolean loadTsFileByType(LoadTsFileType type, File syncedTsFile, TsFileResource tsFileResource) throws TsFileProcessorException, DiskSpaceInsufficientException { File targetFile; @@ -1793,6 +1795,10 @@ public class StorageGroupProcessor { storageGroupName + File.separatorChar + timeRangeId + File.separator + tsFileResource .getFile().getName()); tsFileResource.setFile(targetFile); + if(unSequenceFileList.contains(tsFileResource)){ + logger.error("The file {} has already been loaded in unsequence list", tsFileResource); + return false; + } unSequenceFileList.add(tsFileResource); logger.info("Load tsfile in unsequence list, move file from {} to {}", syncedTsFile.getAbsolutePath(), targetFile.getAbsolutePath()); @@ -1803,6 +1809,10 @@ public class StorageGroupProcessor { storageGroupName + File.separatorChar + timeRangeId + File.separator + tsFileResource.getFile().getName()); tsFileResource.setFile(targetFile); + if(sequenceFileTreeSet.contains(tsFileResource)){ + logger.error("The file {} has already been loaded in sequence list", tsFileResource); + return false; + } sequenceFileTreeSet.add(tsFileResource); logger.info("Load tsfile in sequence list, move file from {} to {}", syncedTsFile.getAbsolutePath(), targetFile.getAbsolutePath()); @@ -1840,6 +1850,7 @@ public class StorageGroupProcessor { syncedResourceFile.getAbsolutePath(), targetResourceFile.getAbsolutePath(), e.getMessage())); } + return true; } /** diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java b/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java index afd72f3..c20032c 100644 --- a/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java +++ b/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java @@ -451,6 +451,10 @@ public class MManager { * @param storageGroup root.node.(node)* */ public void setStorageGroup(String storageGroup) throws MetadataException { + if (mtree.checkStorageGroupByPath(storageGroup)) { + throw new MetadataException( + String.format("Storage group <%s> has already existed.", storageGroup)); + } lock.writeLock().lock(); try { if (writeToLog) { diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/MTree.java b/server/src/main/java/org/apache/iotdb/db/metadata/MTree.java index 4869b70..896b821 100644 --- a/server/src/main/java/org/apache/iotdb/db/metadata/MTree.java +++ b/server/src/main/java/org/apache/iotdb/db/metadata/MTree.java @@ -102,6 +102,9 @@ public class MTree implements Serializable { } MNode leaf = new LeafMNode(cur, nodeNames[nodeNames.length - 1], dataType, encoding, compressor, props); + if (cur.hasChild(leaf.getName())) { + throw new MetadataException(String.format("The timeseries %s has already existed.", path)); + } cur.addChild(leaf); } diff --git a/server/src/main/java/org/apache/iotdb/db/sync/receiver/transfer/SyncServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/sync/receiver/transfer/SyncServiceImpl.java index 6096a53..169a689 100644 --- a/server/src/main/java/org/apache/iotdb/db/sync/receiver/transfer/SyncServiceImpl.java +++ b/server/src/main/java/org/apache/iotdb/db/sync/receiver/transfer/SyncServiceImpl.java @@ -29,6 +29,7 @@ import java.security.MessageDigest; import java.security.NoSuchAlgorithmException; import org.apache.iotdb.db.concurrent.ThreadName; import org.apache.iotdb.db.conf.IoTDBConfig; +import org.apache.iotdb.db.conf.IoTDBConstant; import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.conf.directories.DirectoryManager; import org.apache.iotdb.db.engine.storagegroup.TsFileResource; @@ -45,6 +46,7 @@ import org.apache.iotdb.db.sync.receiver.recover.SyncReceiverLogAnalyzer; import org.apache.iotdb.db.sync.receiver.recover.SyncReceiverLogger; import org.apache.iotdb.db.utils.FilePathUtils; import org.apache.iotdb.db.utils.SyncUtils; +import org.apache.iotdb.service.sync.thrift.ConfirmInfo; import org.apache.iotdb.service.sync.thrift.SyncService; import org.apache.iotdb.service.sync.thrift.SyncStatus; import org.apache.thrift.TException; @@ -75,8 +77,20 @@ public class SyncServiceImpl implements SyncService.Iface { * Verify IP address of sender */ @Override - public SyncStatus check(String ipAddress, String uuid) { + public SyncStatus check(ConfirmInfo info) { + String ipAddress = info.address, uuid = info.uuid; Thread.currentThread().setName(ThreadName.SYNC_SERVER.getName()); + if (!info.version.equals(IoTDBConstant.VERSION)) { + return getErrorResult(String.format("Version mismatch: the sender <%s>, the receiver <%s>", + info.version, IoTDBConstant.VERSION)); + } + if (info.partitionInterval != IoTDBDescriptor.getInstance().getConfig() + .getPartitionInterval()) { + return getErrorResult(String + .format("Partition interval mismatch: the sender <%d>, the receiver <%d>", + info.partitionInterval, + IoTDBDescriptor.getInstance().getConfig().getPartitionInterval())); + } if (SyncUtils.verifyIPSegment(config.getIpWhiteList(), ipAddress)) { senderName.set(ipAddress + SyncConstant.SYNC_DIR_NAME_SEPARATOR + uuid); if (checkRecovery()) { diff --git a/server/src/main/java/org/apache/iotdb/db/sync/sender/manage/SyncFileManager.java b/server/src/main/java/org/apache/iotdb/db/sync/sender/manage/SyncFileManager.java index 6c4c569..d6e360e 100644 --- a/server/src/main/java/org/apache/iotdb/db/sync/sender/manage/SyncFileManager.java +++ b/server/src/main/java/org/apache/iotdb/db/sync/sender/manage/SyncFileManager.java @@ -102,16 +102,6 @@ public class SyncFileManager implements ISyncFileManager { .equals(TsFileConstant.PATH_UPGRADE)) { continue; } - try { - if (!MManager.getInstance().getStorageGroupName(sgFolder.getName()) - .equals(sgFolder.getName())) { - // the folder is not a sg folder - continue; - } - } catch (MetadataException e) { - // the folder is not a sg folder - continue; - } allSGs.putIfAbsent(sgFolder.getName(), new HashSet<>()); currentAllLocalFiles.putIfAbsent(sgFolder.getName(), new HashMap<>()); for (File timeRangeFolder : sgFolder.listFiles()) { diff --git a/server/src/main/java/org/apache/iotdb/db/sync/sender/transfer/SyncClient.java b/server/src/main/java/org/apache/iotdb/db/sync/sender/transfer/SyncClient.java index 653c6c2..e627e43 100644 --- a/server/src/main/java/org/apache/iotdb/db/sync/sender/transfer/SyncClient.java +++ b/server/src/main/java/org/apache/iotdb/db/sync/sender/transfer/SyncClient.java @@ -53,6 +53,7 @@ import java.util.concurrent.TimeUnit; import org.apache.commons.io.FileUtils; import org.apache.iotdb.db.concurrent.IoTDBThreadPoolFactory; import org.apache.iotdb.db.concurrent.ThreadName; +import org.apache.iotdb.db.conf.IoTDBConstant; import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.engine.storagegroup.TsFileResource; import org.apache.iotdb.db.exception.SyncConnectionException; @@ -67,6 +68,7 @@ import org.apache.iotdb.db.sync.sender.recover.ISyncSenderLogger; import org.apache.iotdb.db.sync.sender.recover.SyncSenderLogAnalyzer; import org.apache.iotdb.db.sync.sender.recover.SyncSenderLogger; import org.apache.iotdb.db.utils.SyncUtils; +import org.apache.iotdb.service.sync.thrift.ConfirmInfo; import org.apache.iotdb.service.sync.thrift.SyncService; import org.apache.iotdb.service.sync.thrift.SyncStatus; import org.apache.iotdb.tsfile.utils.BytesUtils; @@ -286,9 +288,12 @@ public class SyncClient implements ISyncClient { @Override public void confirmIdentity() throws SyncConnectionException { - try (Socket socket = new Socket(config.getServerIp(), config.getServerPort())){ + try (Socket socket = new Socket(config.getServerIp(), config.getServerPort())) { + ConfirmInfo info = new ConfirmInfo(socket.getLocalAddress().getHostAddress(), + getOrCreateUUID(getUuidFile()), + IoTDBDescriptor.getInstance().getConfig().getPartitionInterval(), IoTDBConstant.VERSION); SyncStatus status = serviceClient - .check(socket.getLocalAddress().getHostAddress(), getOrCreateUUID(getUuidFile())); + .check(info); if (status.code != SUCCESS_CODE) { throw new SyncConnectionException( "The receiver rejected the synchronization task because " + status.msg); diff --git a/server/src/main/java/org/apache/iotdb/db/utils/FileLoaderUtils.java b/server/src/main/java/org/apache/iotdb/db/utils/FileLoaderUtils.java index 765c17a..64f04c6 100644 --- a/server/src/main/java/org/apache/iotdb/db/utils/FileLoaderUtils.java +++ b/server/src/main/java/org/apache/iotdb/db/utils/FileLoaderUtils.java @@ -55,6 +55,7 @@ public class FileLoaderUtils { } else { tsFileResource.deserialize(); } + tsFileResource.setClosed(true); } public static void updateTsFileResource(TsFileMetaData metaData, TsFileSequenceReader reader, diff --git a/service-rpc/src/main/thrift/sync.thrift b/service-rpc/src/main/thrift/sync.thrift index 9b20673..3e243f4 100755 --- a/service-rpc/src/main/thrift/sync.thrift +++ b/service-rpc/src/main/thrift/sync.thrift @@ -18,15 +18,28 @@ */ namespace java org.apache.iotdb.service.sync.thrift -typedef i32 int - struct SyncStatus{ - required i32 code - required string msg + 1:required i32 code + 2:required string msg +} + +// The sender and receiver need to check some info to confirm validity +struct ConfirmInfo{ + // check whether the ip of sender is in thw white list of receiver. + 1:string address + + // Sender needs to tell receiver its identity. + 2:string uuid + + // The partition interval of sender and receiver need to be the same. + 3:i64 partitionInterval + + // The version of sender and receiver need to be the same. + 4:string version } service SyncService{ - SyncStatus check(1:string address, 2:string uuid) + SyncStatus check(ConfirmInfo info) SyncStatus startSync(); SyncStatus init(1:string storageGroupName) SyncStatus syncDeletedFileName(1:string fileName)
