This is an automated email from the ASF dual-hosted git repository. lta pushed a commit to branch revert-918-fix_add_duplicated_metadata_bug in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
commit e51b03b805e99987cd27e9e59fdadc43972a74e9 Author: Tianan Li <[email protected]> AuthorDate: Wed Mar 18 09:20:45 2020 +0800 Revert "Fix bugs of add duplicated metadata (#918)" This reverts commit a92b80cdd72f030b90b4d150bfd9fedc8b1e4a32. --- .../engine/storagegroup/StorageGroupProcessor.java | 17 +++------------- .../org/apache/iotdb/db/metadata/MManager.java | 2 +- .../java/org/apache/iotdb/db/metadata/MTree.java | 3 --- .../db/sync/receiver/transfer/SyncServiceImpl.java | 16 +-------------- .../db/sync/sender/manage/SyncFileManager.java | 10 ++++++++++ .../iotdb/db/sync/sender/transfer/SyncClient.java | 9 ++------- .../org/apache/iotdb/db/utils/FileLoaderUtils.java | 1 - .../org/apache/iotdb/db/metadata/MTreeTest.java | 2 +- service-rpc/src/main/thrift/sync.thrift | 23 +++++----------------- 9 files changed, 23 insertions(+), 60 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java index 6642ae1..674c0fd 100755 --- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java @@ -1537,9 +1537,8 @@ public class StorageGroupProcessor { writeLock(); mergeLock.writeLock().lock(); try { - if (loadTsFileByType(LoadTsFileType.LOAD_SEQUENCE, tsfileToBeInserted, newTsFileResource)){ - updateLatestTimeMap(newTsFileResource); - } + loadTsFileByType(LoadTsFileType.LOAD_SEQUENCE, tsfileToBeInserted, newTsFileResource); + updateLatestTimeMap(newTsFileResource); } catch (DiskSpaceInsufficientException e) { logger.error( "Failed to append the tsfile {} to storage group processor {} because the disk space is insufficient.", @@ -1781,9 +1780,8 @@ public class StorageGroupProcessor { * @param type load type * @param tsFileResource tsfile resource to be loaded * @UsedBy sync module, load external tsfile module. - * @return load the file successfully */ - private boolean loadTsFileByType(LoadTsFileType type, File syncedTsFile, + private void loadTsFileByType(LoadTsFileType type, File syncedTsFile, TsFileResource tsFileResource) throws TsFileProcessorException, DiskSpaceInsufficientException { File targetFile; @@ -1795,10 +1793,6 @@ public class StorageGroupProcessor { storageGroupName + File.separatorChar + timeRangeId + File.separator + tsFileResource .getFile().getName()); tsFileResource.setFile(targetFile); - if(unSequenceFileList.contains(tsFileResource)){ - logger.error("The file {} has already been loaded in unsequence list", tsFileResource); - return false; - } unSequenceFileList.add(tsFileResource); logger.info("Load tsfile in unsequence list, move file from {} to {}", syncedTsFile.getAbsolutePath(), targetFile.getAbsolutePath()); @@ -1809,10 +1803,6 @@ public class StorageGroupProcessor { storageGroupName + File.separatorChar + timeRangeId + File.separator + tsFileResource.getFile().getName()); tsFileResource.setFile(targetFile); - if(sequenceFileTreeSet.contains(tsFileResource)){ - logger.error("The file {} has already been loaded in sequence list", tsFileResource); - return false; - } sequenceFileTreeSet.add(tsFileResource); logger.info("Load tsfile in sequence list, move file from {} to {}", syncedTsFile.getAbsolutePath(), targetFile.getAbsolutePath()); @@ -1850,7 +1840,6 @@ public class StorageGroupProcessor { syncedResourceFile.getAbsolutePath(), targetResourceFile.getAbsolutePath(), e.getMessage())); } - return true; } /** diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java b/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java index 94a382f..afd72f3 100644 --- a/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java +++ b/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java @@ -453,13 +453,13 @@ public class MManager { public void setStorageGroup(String storageGroup) throws MetadataException { lock.writeLock().lock(); try { - mtree.setStorageGroup(storageGroup); if (writeToLog) { BufferedWriter writer = getLogWriter(); writer.write(MetadataOperationType.SET_STORAGE_GROUP + "," + storageGroup); writer.newLine(); writer.flush(); } + mtree.setStorageGroup(storageGroup); IoTDBConfigDynamicAdapter.getInstance().addOrDeleteStorageGroup(1); ActiveTimeSeriesCounter.getInstance().init(storageGroup); seriesNumberInStorageGroups.put(storageGroup, 0); diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/MTree.java b/server/src/main/java/org/apache/iotdb/db/metadata/MTree.java index 896b821..4869b70 100644 --- a/server/src/main/java/org/apache/iotdb/db/metadata/MTree.java +++ b/server/src/main/java/org/apache/iotdb/db/metadata/MTree.java @@ -102,9 +102,6 @@ public class MTree implements Serializable { } MNode leaf = new LeafMNode(cur, nodeNames[nodeNames.length - 1], dataType, encoding, compressor, props); - if (cur.hasChild(leaf.getName())) { - throw new MetadataException(String.format("The timeseries %s has already existed.", path)); - } cur.addChild(leaf); } diff --git a/server/src/main/java/org/apache/iotdb/db/sync/receiver/transfer/SyncServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/sync/receiver/transfer/SyncServiceImpl.java index 169a689..6096a53 100644 --- a/server/src/main/java/org/apache/iotdb/db/sync/receiver/transfer/SyncServiceImpl.java +++ b/server/src/main/java/org/apache/iotdb/db/sync/receiver/transfer/SyncServiceImpl.java @@ -29,7 +29,6 @@ import java.security.MessageDigest; import java.security.NoSuchAlgorithmException; import org.apache.iotdb.db.concurrent.ThreadName; import org.apache.iotdb.db.conf.IoTDBConfig; -import org.apache.iotdb.db.conf.IoTDBConstant; import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.conf.directories.DirectoryManager; import org.apache.iotdb.db.engine.storagegroup.TsFileResource; @@ -46,7 +45,6 @@ import org.apache.iotdb.db.sync.receiver.recover.SyncReceiverLogAnalyzer; import org.apache.iotdb.db.sync.receiver.recover.SyncReceiverLogger; import org.apache.iotdb.db.utils.FilePathUtils; import org.apache.iotdb.db.utils.SyncUtils; -import org.apache.iotdb.service.sync.thrift.ConfirmInfo; import org.apache.iotdb.service.sync.thrift.SyncService; import org.apache.iotdb.service.sync.thrift.SyncStatus; import org.apache.thrift.TException; @@ -77,20 +75,8 @@ public class SyncServiceImpl implements SyncService.Iface { * Verify IP address of sender */ @Override - public SyncStatus check(ConfirmInfo info) { - String ipAddress = info.address, uuid = info.uuid; + public SyncStatus check(String ipAddress, String uuid) { Thread.currentThread().setName(ThreadName.SYNC_SERVER.getName()); - if (!info.version.equals(IoTDBConstant.VERSION)) { - return getErrorResult(String.format("Version mismatch: the sender <%s>, the receiver <%s>", - info.version, IoTDBConstant.VERSION)); - } - if (info.partitionInterval != IoTDBDescriptor.getInstance().getConfig() - .getPartitionInterval()) { - return getErrorResult(String - .format("Partition interval mismatch: the sender <%d>, the receiver <%d>", - info.partitionInterval, - IoTDBDescriptor.getInstance().getConfig().getPartitionInterval())); - } if (SyncUtils.verifyIPSegment(config.getIpWhiteList(), ipAddress)) { senderName.set(ipAddress + SyncConstant.SYNC_DIR_NAME_SEPARATOR + uuid); if (checkRecovery()) { diff --git a/server/src/main/java/org/apache/iotdb/db/sync/sender/manage/SyncFileManager.java b/server/src/main/java/org/apache/iotdb/db/sync/sender/manage/SyncFileManager.java index d6e360e..6c4c569 100644 --- a/server/src/main/java/org/apache/iotdb/db/sync/sender/manage/SyncFileManager.java +++ b/server/src/main/java/org/apache/iotdb/db/sync/sender/manage/SyncFileManager.java @@ -102,6 +102,16 @@ public class SyncFileManager implements ISyncFileManager { .equals(TsFileConstant.PATH_UPGRADE)) { continue; } + try { + if (!MManager.getInstance().getStorageGroupName(sgFolder.getName()) + .equals(sgFolder.getName())) { + // the folder is not a sg folder + continue; + } + } catch (MetadataException e) { + // the folder is not a sg folder + continue; + } allSGs.putIfAbsent(sgFolder.getName(), new HashSet<>()); currentAllLocalFiles.putIfAbsent(sgFolder.getName(), new HashMap<>()); for (File timeRangeFolder : sgFolder.listFiles()) { diff --git a/server/src/main/java/org/apache/iotdb/db/sync/sender/transfer/SyncClient.java b/server/src/main/java/org/apache/iotdb/db/sync/sender/transfer/SyncClient.java index e627e43..653c6c2 100644 --- a/server/src/main/java/org/apache/iotdb/db/sync/sender/transfer/SyncClient.java +++ b/server/src/main/java/org/apache/iotdb/db/sync/sender/transfer/SyncClient.java @@ -53,7 +53,6 @@ import java.util.concurrent.TimeUnit; import org.apache.commons.io.FileUtils; import org.apache.iotdb.db.concurrent.IoTDBThreadPoolFactory; import org.apache.iotdb.db.concurrent.ThreadName; -import org.apache.iotdb.db.conf.IoTDBConstant; import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.engine.storagegroup.TsFileResource; import org.apache.iotdb.db.exception.SyncConnectionException; @@ -68,7 +67,6 @@ import org.apache.iotdb.db.sync.sender.recover.ISyncSenderLogger; import org.apache.iotdb.db.sync.sender.recover.SyncSenderLogAnalyzer; import org.apache.iotdb.db.sync.sender.recover.SyncSenderLogger; import org.apache.iotdb.db.utils.SyncUtils; -import org.apache.iotdb.service.sync.thrift.ConfirmInfo; import org.apache.iotdb.service.sync.thrift.SyncService; import org.apache.iotdb.service.sync.thrift.SyncStatus; import org.apache.iotdb.tsfile.utils.BytesUtils; @@ -288,12 +286,9 @@ public class SyncClient implements ISyncClient { @Override public void confirmIdentity() throws SyncConnectionException { - try (Socket socket = new Socket(config.getServerIp(), config.getServerPort())) { - ConfirmInfo info = new ConfirmInfo(socket.getLocalAddress().getHostAddress(), - getOrCreateUUID(getUuidFile()), - IoTDBDescriptor.getInstance().getConfig().getPartitionInterval(), IoTDBConstant.VERSION); + try (Socket socket = new Socket(config.getServerIp(), config.getServerPort())){ SyncStatus status = serviceClient - .check(info); + .check(socket.getLocalAddress().getHostAddress(), getOrCreateUUID(getUuidFile())); if (status.code != SUCCESS_CODE) { throw new SyncConnectionException( "The receiver rejected the synchronization task because " + status.msg); diff --git a/server/src/main/java/org/apache/iotdb/db/utils/FileLoaderUtils.java b/server/src/main/java/org/apache/iotdb/db/utils/FileLoaderUtils.java index 64f04c6..765c17a 100644 --- a/server/src/main/java/org/apache/iotdb/db/utils/FileLoaderUtils.java +++ b/server/src/main/java/org/apache/iotdb/db/utils/FileLoaderUtils.java @@ -55,7 +55,6 @@ public class FileLoaderUtils { } else { tsFileResource.deserialize(); } - tsFileResource.setClosed(true); } public static void updateTsFileResource(TsFileMetaData metaData, TsFileSequenceReader reader, diff --git a/server/src/test/java/org/apache/iotdb/db/metadata/MTreeTest.java b/server/src/test/java/org/apache/iotdb/db/metadata/MTreeTest.java index 3f6d67e..6cd9783 100644 --- a/server/src/test/java/org/apache/iotdb/db/metadata/MTreeTest.java +++ b/server/src/test/java/org/apache/iotdb/db/metadata/MTreeTest.java @@ -296,7 +296,7 @@ public class MTreeTest { root.setStorageGroup("root.laptop.d2"); root.createTimeseries("root.laptop.d1.s1", TSDataType.INT32, TSEncoding.PLAIN, CompressionType.GZIP, null); - root.createTimeseries("root.laptop.d1.s2", TSDataType.INT32, TSEncoding.PLAIN, + root.createTimeseries("root.laptop.d1.s1", TSDataType.INT32, TSEncoding.PLAIN, CompressionType.GZIP, null); List<String> list = new ArrayList<>(); diff --git a/service-rpc/src/main/thrift/sync.thrift b/service-rpc/src/main/thrift/sync.thrift index 3e243f4..9b20673 100755 --- a/service-rpc/src/main/thrift/sync.thrift +++ b/service-rpc/src/main/thrift/sync.thrift @@ -18,28 +18,15 @@ */ namespace java org.apache.iotdb.service.sync.thrift -struct SyncStatus{ - 1:required i32 code - 2:required string msg -} - -// The sender and receiver need to check some info to confirm validity -struct ConfirmInfo{ - // check whether the ip of sender is in thw white list of receiver. - 1:string address +typedef i32 int - // Sender needs to tell receiver its identity. - 2:string uuid - - // The partition interval of sender and receiver need to be the same. - 3:i64 partitionInterval - - // The version of sender and receiver need to be the same. - 4:string version +struct SyncStatus{ + required i32 code + required string msg } service SyncService{ - SyncStatus check(ConfirmInfo info) + SyncStatus check(1:string address, 2:string uuid) SyncStatus startSync(); SyncStatus init(1:string storageGroupName) SyncStatus syncDeletedFileName(1:string fileName)
