This is an automated email from the ASF dual-hosted git repository.
haonan pushed a commit to branch rel/0.12
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/rel/0.12 by this push:
new 0f3ce73 [To rel/0.12][IOTDB-1985]Fix Sync Error in parse sg path
between different os (#4347)
0f3ce73 is described below
commit 0f3ce7381b77e7be9e363890147b78f9b210166c
Author: yschengzi <[email protected]>
AuthorDate: Fri Nov 12 16:32:25 2021 +0800
[To rel/0.12][IOTDB-1985]Fix Sync Error in parse sg path between different
os (#4347)
---
.../apache/iotdb/db/sync/conf/SyncConstant.java | 2 +
.../db/sync/receiver/transfer/SyncServiceImpl.java | 4 +-
.../iotdb/db/sync/sender/transfer/SyncClient.java | 23 +-
.../db/sync/sender/transfer/SyncClientTest.java | 256 ++++++++++++++++++++-
4 files changed, 273 insertions(+), 12 deletions(-)
diff --git
a/server/src/main/java/org/apache/iotdb/db/sync/conf/SyncConstant.java
b/server/src/main/java/org/apache/iotdb/db/sync/conf/SyncConstant.java
index 5d538ff..5a6be8a 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/conf/SyncConstant.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/conf/SyncConstant.java
@@ -36,6 +36,8 @@ public class SyncConstant {
public static final String SYNC_DIR_NAME_SEPARATOR = "_";
+ public static final String SYNC_FILE_DIR_SEPARATOR = "!";
+
/** Split data file, block size at each transmission */
public static final int DATA_CHUNK_SIZE =
Math.min(64 * 1024 * 1024, RpcUtils.THRIFT_FRAME_MAX_SIZE);
diff --git
a/server/src/main/java/org/apache/iotdb/db/sync/receiver/transfer/SyncServiceImpl.java
b/server/src/main/java/org/apache/iotdb/db/sync/receiver/transfer/SyncServiceImpl.java
index ec38878..ddf726d 100644
---
a/server/src/main/java/org/apache/iotdb/db/sync/receiver/transfer/SyncServiceImpl.java
+++
b/server/src/main/java/org/apache/iotdb/db/sync/receiver/transfer/SyncServiceImpl.java
@@ -179,9 +179,9 @@ public class SyncServiceImpl implements SyncService.Iface {
return getSuccessResult();
}
- private String getFilePathByFileInfo(String fileInfo) { // for different os
+ public String getFilePathByFileInfo(String fileInfo) { // for different os
String filePath = "";
- String[] fileInfos = fileInfo.split(SyncConstant.SYNC_DIR_NAME_SEPARATOR);
+ String[] fileInfos = fileInfo.split(SyncConstant.SYNC_FILE_DIR_SEPARATOR);
for (int i = 0; i < fileInfos.length - 1; i++) {
filePath += fileInfos[i] + File.separator;
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/sync/sender/transfer/SyncClient.java
b/server/src/main/java/org/apache/iotdb/db/sync/sender/transfer/SyncClient.java
index 434e0bd..b876a5f 100644
---
a/server/src/main/java/org/apache/iotdb/db/sync/sender/transfer/SyncClient.java
+++
b/server/src/main/java/org/apache/iotdb/db/sync/sender/transfer/SyncClient.java
@@ -35,6 +35,7 @@ import
org.apache.iotdb.db.sync.sender.recover.ISyncSenderLogger;
import org.apache.iotdb.db.sync.sender.recover.SyncSenderLogAnalyzer;
import org.apache.iotdb.db.sync.sender.recover.SyncSenderLogger;
import org.apache.iotdb.db.utils.SyncUtils;
+import org.apache.iotdb.db.utils.TestOnly;
import org.apache.iotdb.rpc.RpcTransportFactory;
import org.apache.iotdb.rpc.TConfigurationConst;
import org.apache.iotdb.rpc.TSocketWrapper;
@@ -797,9 +798,27 @@ public class SyncClient implements ISyncClient {
private String getFileInfoWithVgAndTimePartition(File file) {
return file.getParentFile().getParentFile().getName()
- + SyncConstant.SYNC_DIR_NAME_SEPARATOR
+ + SyncConstant.SYNC_FILE_DIR_SEPARATOR
+ file.getParentFile().getName()
- + SyncConstant.SYNC_DIR_NAME_SEPARATOR
+ + SyncConstant.SYNC_FILE_DIR_SEPARATOR
+ file.getName();
}
+
+ @TestOnly
+ public void setContent(
+ SyncService.Client serviceClient,
+ boolean isSyncConnect,
+ Map<String, Map<Long, Set<Long>>> allSG,
+ Map<String, Map<Long, Map<Long, Set<File>>>> toBeSyncedFilesMap,
+ Map<String, Map<Long, Map<Long, Set<File>>>> deletedFilesMap,
+ Map<String, Map<Long, Map<Long, Set<File>>>> lastLocalFilesMap,
+ ISyncSenderLogger syncLog) {
+ this.serviceClient = serviceClient;
+ this.isSyncConnect = isSyncConnect;
+ this.allSG = allSG;
+ this.toBeSyncedFilesMap = toBeSyncedFilesMap;
+ this.deletedFilesMap = deletedFilesMap;
+ this.lastLocalFilesMap = lastLocalFilesMap;
+ this.syncLog = syncLog;
+ }
}
diff --git
a/server/src/test/java/org/apache/iotdb/db/sync/sender/transfer/SyncClientTest.java
b/server/src/test/java/org/apache/iotdb/db/sync/sender/transfer/SyncClientTest.java
index 7c1fd3a..6254bda 100644
---
a/server/src/test/java/org/apache/iotdb/db/sync/sender/transfer/SyncClientTest.java
+++
b/server/src/test/java/org/apache/iotdb/db/sync/sender/transfer/SyncClientTest.java
@@ -23,14 +23,24 @@ import
org.apache.iotdb.db.conf.directories.DirectoryManager;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.exception.DiskSpaceInsufficientException;
import org.apache.iotdb.db.exception.StorageEngineException;
+import org.apache.iotdb.db.sync.conf.SyncConstant;
import org.apache.iotdb.db.sync.conf.SyncSenderConfig;
import org.apache.iotdb.db.sync.conf.SyncSenderDescriptor;
+import org.apache.iotdb.db.sync.receiver.transfer.SyncServiceImpl;
import org.apache.iotdb.db.sync.sender.recover.ISyncSenderLogAnalyzer;
+import org.apache.iotdb.db.sync.sender.recover.ISyncSenderLogger;
import org.apache.iotdb.db.sync.sender.recover.SyncSenderLogAnalyzer;
+import org.apache.iotdb.db.sync.sender.recover.SyncSenderLogger;
import org.apache.iotdb.db.utils.EnvironmentUtils;
+import org.apache.iotdb.service.sync.thrift.ConfirmInfo;
+import org.apache.iotdb.service.sync.thrift.SyncService;
+import org.apache.iotdb.service.sync.thrift.SyncStatus;
import org.apache.iotdb.tsfile.utils.FilePathUtils;
+import org.apache.thrift.TException;
+import org.apache.thrift.protocol.TProtocol;
import org.junit.After;
+import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
@@ -38,12 +48,9 @@ import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
+import java.nio.ByteBuffer;
+import java.util.*;
import java.util.Map.Entry;
-import java.util.Random;
-import java.util.Set;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
@@ -57,13 +64,26 @@ public class SyncClientTest {
private String dataDir;
private ISyncSenderLogAnalyzer senderLogAnalyzer;
+ private String tsfileDataDir;
+ private SyncService.Client serviceClient;
+ private Map<String, Map<Long, Set<Long>>> allSG = new HashMap<>();
+ private Map<String, Map<Long, Map<Long, Set<File>>>> toBeSyncedFilesMap =
new HashMap<>();
+ private Map<String, Map<Long, Map<Long, Set<File>>>> deletedFilesMap = new
HashMap<>();
+ private Map<String, Map<Long, Map<Long, Set<File>>>> lastLocalFilesMap = new
HashMap<>();
+ private ISyncSenderLogger syncLog;
+ private Map<String, List<String>> allFiles = new HashMap<>();
+
+ private String sg1 = "root.sg1";
+ private String sg2 = "root.sg_2";
+
@Before
public void setUp() throws DiskSpaceInsufficientException {
EnvironmentUtils.envSetUp();
dataDir =
- new File(DirectoryManager.getInstance().getNextFolderForSequenceFile())
- .getParentFile()
- .getAbsolutePath();
+ tsfileDataDir =
+ new
File(DirectoryManager.getInstance().getNextFolderForSequenceFile())
+ .getParentFile()
+ .getAbsolutePath();
config.update(dataDir);
senderLogAnalyzer = new
SyncSenderLogAnalyzer(config.getSenderFolderPath());
}
@@ -158,4 +178,224 @@ public class SyncClientTest {
assertFalse(new File(config.getSnapshotPath()).exists());
assertTrue(new File(config.getLastFileInfoPath()).exists());
}
+
+ private void prepareMap(Map<String, Map<Long, Map<Long, Set<File>>>> map) {
+ map.put(sg1, new HashMap<>());
+ map.put(sg2, new HashMap<>());
+ map.get(sg1).put(0l, new HashMap<>());
+ map.get(sg2).put(0l, new HashMap<>());
+ map.get(sg1).get(0l).put(0l, new HashSet<>());
+ map.get(sg2).get(0l).put(0l, new HashSet<>());
+ }
+
+ private File getTsFile(String sgName, Integer vg, Integer partition, String
tsFileName) {
+ String fileName =
+ tsfileDataDir
+ + File.separator
+ + sgName
+ + File.separator
+ + vg
+ + File.separator
+ + partition
+ + File.separator
+ + tsFileName;
+ File file = new File(fileName);
+ File resource = new File(fileName + ".resource");
+ try {
+ file.createNewFile();
+ resource.createNewFile();
+ } catch (IOException e) {
+ logger.warn("Can not create tsfile: ", file.getPath());
+ }
+ return file;
+ }
+
+ private void prepare() throws Exception {
+ serviceClient = new MockSyncClient(null);
+ syncLog = new SyncSenderLogger(new File(dataDir + "sync.log"));
+
+ allSG.put(sg1, new HashMap<>());
+ allSG.put(sg2, new HashMap<>());
+ allSG.get(sg1).put(0l, new HashSet<>());
+ allSG.get(sg2).put(0l, new HashSet<>());
+ allSG.get(sg1).get(0l).add(0l);
+ allSG.get(sg2).get(0l).add(0l);
+
+ prepareMap(toBeSyncedFilesMap);
+ prepareMap(deletedFilesMap);
+ prepareMap(lastLocalFilesMap);
+
+ lastLocalFilesMap.get(sg1).get(0l).get(0l).add(getTsFile(sg1, 0, 0,
"1-1-0-0.tsfile"));
+ lastLocalFilesMap.get(sg1).get(0l).get(0l).add(getTsFile(sg1, 0, 0,
"2-2-0-0.tsfile"));
+ lastLocalFilesMap.get(sg1).get(0l).get(0l).add(getTsFile(sg1, 0, 0,
"3-3-0-0.tsfile"));
+ lastLocalFilesMap.get(sg1).get(0l).get(0l).add(getTsFile(sg1, 0, 0,
"4-4-0-0.tsfile"));
+ lastLocalFilesMap.get(sg2).get(0l).get(0l).add(getTsFile(sg2, 0, 0,
"1-1-0-0.tsfile"));
+
+ deletedFilesMap.get(sg1).get(0l).get(0l).add(getTsFile(sg1, 0, 0,
"1-1-0-0.tsfile"));
+ deletedFilesMap.get(sg1).get(0l).get(0l).add(getTsFile(sg1, 0, 0,
"2-2-0-0.tsfile"));
+ deletedFilesMap.get(sg2).get(0l).get(0l).add(getTsFile(sg2, 0, 0,
"1-1-0-0.tsfile"));
+
+ toBeSyncedFilesMap.get(sg1).get(0l).get(0l).add(getTsFile(sg1, 0, 0,
"5-5-0-0.tsfile"));
+ toBeSyncedFilesMap.get(sg1).get(0l).get(0l).add(getTsFile(sg1, 0, 0,
"6-6-0-0.tsfile"));
+ toBeSyncedFilesMap.get(sg2).get(0l).get(0l).add(getTsFile(sg2, 0, 0,
"2-2-0-0.tsfile"));
+
+ allFiles.put(sg1, new ArrayList<>());
+ allFiles.put(sg2, new ArrayList<>());
+ for (int i = 1; i <= 6; i++) {
+ allFiles.get(sg1).add(getTsFile(sg1, 0, 0, i + "-" + i +
"-0-0.tsfile").getPath());
+ }
+ for (int i = 1; i <= 2; i++) {
+ allFiles.get(sg2).add(getTsFile(sg2, 0, 0, i + "-" + i +
"-0-0.tsfile").getPath());
+ }
+
+ ((SyncClient) manager)
+ .setContent(
+ serviceClient,
+ true,
+ allSG,
+ toBeSyncedFilesMap,
+ deletedFilesMap,
+ lastLocalFilesMap,
+ syncLog);
+ }
+
+ @Test
+ public void testGetFileInfoWithVgAndPartition() {
+ try {
+ prepare();
+ List<String> serviceClientStrings;
+ SyncServiceImpl syncServiceImpl = new SyncServiceImpl();
+
+ manager.syncDeletedFilesNameInOneGroup(sg1, 0l, 0l,
deletedFilesMap.get(sg1).get(0l).get(0l));
+ manager.syncDeletedFilesNameInOneGroup(sg2, 0l, 0l,
deletedFilesMap.get(sg2).get(0l).get(0l));
+
+ manager.syncDataFilesInOneGroup(sg1, 0l, 0l,
toBeSyncedFilesMap.get(sg1).get(0l).get(0l));
+ manager.syncDataFilesInOneGroup(sg2, 0l, 0l,
toBeSyncedFilesMap.get(sg2).get(0l).get(0l));
+
+ serviceClientStrings = ((MockSyncClient) serviceClient).getString();
+
+ for (String string : serviceClientStrings) {
+ string = syncServiceImpl.getFilePathByFileInfo(string);
+ if (string.endsWith(".tsfile")) {
+ boolean in = false;
+ for (String path : allFiles.get(sg1)) {
+ if (path.contains(string)) in = true;
+ }
+ for (String path : allFiles.get(sg2)) {
+ if (path.contains(string)) in = true;
+ }
+ assertTrue(in);
+ }
+ }
+ } catch (Exception e) {
+ Assert.fail();
+ }
+ }
+
+ static class MockSyncClient extends SyncService.Client {
+ boolean isConnected = true;
+ boolean isError = false;
+
+ List<String> strings = new ArrayList<>();
+ List<ByteBuffer> byteBuffers = new ArrayList<>();
+
+ public void setConnected(boolean connected) {
+ isConnected = connected;
+ }
+
+ public void setError(boolean error) {
+ isError = error;
+ }
+
+ public List<String> getString() {
+ return strings;
+ }
+
+ public void setStrings(List<String> strings) {
+ this.strings = strings;
+ }
+
+ public List<ByteBuffer> getByteBuffer() {
+ return byteBuffers;
+ }
+
+ public void setByteBuffers(List<ByteBuffer> byteBuffers) {
+ this.byteBuffers = byteBuffers;
+ }
+
+ public MockSyncClient(TProtocol prot) {
+ super(prot);
+ }
+
+ private SyncStatus getSuccessStatus() {
+ return new SyncStatus(SyncConstant.SUCCESS_CODE, "");
+ }
+
+ private SyncStatus getSuccessStatus(String msg) {
+ return new SyncStatus(SyncConstant.SUCCESS_CODE, msg);
+ }
+
+ private SyncStatus getErrorStatus() {
+ return new SyncStatus(SyncConstant.ERROR_CODE, "");
+ }
+
+ private SyncStatus getErrorStatus(String msg) {
+ return new SyncStatus(SyncConstant.ERROR_CODE, msg);
+ }
+
+ private SyncStatus getMockSyncStatus() throws TException {
+ if (!isConnected) throw new TException("Read time out");
+ return isError ? getErrorStatus() : getSuccessStatus();
+ }
+
+ private SyncStatus getMockSyncStatus(String msg) throws TException {
+ if (!isConnected) throw new TException("Read time out");
+ return isError ? getErrorStatus(msg) : getSuccessStatus(msg);
+ }
+
+ @Override
+ public SyncStatus check(ConfirmInfo info) throws TException {
+ return getMockSyncStatus();
+ }
+
+ @Override
+ public SyncStatus startSync() throws TException {
+ return getMockSyncStatus();
+ }
+
+ @Override
+ public SyncStatus init(String storageGroupName) throws TException {
+ strings.add(storageGroupName);
+ return getMockSyncStatus();
+ }
+
+ @Override
+ public SyncStatus syncDeletedFileName(String fileName) throws TException {
+ strings.add(fileName);
+ return getMockSyncStatus();
+ }
+
+ @Override
+ public SyncStatus initSyncData(String filename) throws TException {
+ strings.add(filename);
+ return getMockSyncStatus();
+ }
+
+ @Override
+ public SyncStatus syncData(ByteBuffer buff) throws TException {
+ byteBuffers.add(buff);
+ return getMockSyncStatus();
+ }
+
+ @Override
+ public SyncStatus checkDataDigest(String md5) throws TException {
+ strings.add(md5);
+ return getMockSyncStatus(md5);
+ }
+
+ @Override
+ public SyncStatus endSync() throws TException {
+ return getMockSyncStatus();
+ }
+ }
}