This is an automated email from the ASF dual-hosted git repository.

haonan pushed a commit to branch rel/0.12
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/rel/0.12 by this push:
     new 0f3ce73  [To rel/0.12][IOTDB-1985]Fix Sync Error in parse sg path 
between different os (#4347)
0f3ce73 is described below

commit 0f3ce7381b77e7be9e363890147b78f9b210166c
Author: yschengzi <[email protected]>
AuthorDate: Fri Nov 12 16:32:25 2021 +0800

    [To rel/0.12][IOTDB-1985]Fix Sync Error in parse sg path between different 
os (#4347)
---
 .../apache/iotdb/db/sync/conf/SyncConstant.java    |   2 +
 .../db/sync/receiver/transfer/SyncServiceImpl.java |   4 +-
 .../iotdb/db/sync/sender/transfer/SyncClient.java  |  23 +-
 .../db/sync/sender/transfer/SyncClientTest.java    | 256 ++++++++++++++++++++-
 4 files changed, 273 insertions(+), 12 deletions(-)

diff --git 
a/server/src/main/java/org/apache/iotdb/db/sync/conf/SyncConstant.java 
b/server/src/main/java/org/apache/iotdb/db/sync/conf/SyncConstant.java
index 5d538ff..5a6be8a 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/conf/SyncConstant.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/conf/SyncConstant.java
@@ -36,6 +36,8 @@ public class SyncConstant {
 
   public static final String SYNC_DIR_NAME_SEPARATOR = "_";
 
+  public static final String SYNC_FILE_DIR_SEPARATOR = "!";
+
   /** Split data file, block size at each transmission */
   public static final int DATA_CHUNK_SIZE =
       Math.min(64 * 1024 * 1024, RpcUtils.THRIFT_FRAME_MAX_SIZE);
diff --git 
a/server/src/main/java/org/apache/iotdb/db/sync/receiver/transfer/SyncServiceImpl.java
 
b/server/src/main/java/org/apache/iotdb/db/sync/receiver/transfer/SyncServiceImpl.java
index ec38878..ddf726d 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/sync/receiver/transfer/SyncServiceImpl.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/sync/receiver/transfer/SyncServiceImpl.java
@@ -179,9 +179,9 @@ public class SyncServiceImpl implements SyncService.Iface {
     return getSuccessResult();
   }
 
-  private String getFilePathByFileInfo(String fileInfo) { // for different os
+  public String getFilePathByFileInfo(String fileInfo) { // for different os
     String filePath = "";
-    String[] fileInfos = fileInfo.split(SyncConstant.SYNC_DIR_NAME_SEPARATOR);
+    String[] fileInfos = fileInfo.split(SyncConstant.SYNC_FILE_DIR_SEPARATOR);
     for (int i = 0; i < fileInfos.length - 1; i++) {
       filePath += fileInfos[i] + File.separator;
     }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/sync/sender/transfer/SyncClient.java 
b/server/src/main/java/org/apache/iotdb/db/sync/sender/transfer/SyncClient.java
index 434e0bd..b876a5f 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/sync/sender/transfer/SyncClient.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/sync/sender/transfer/SyncClient.java
@@ -35,6 +35,7 @@ import 
org.apache.iotdb.db.sync.sender.recover.ISyncSenderLogger;
 import org.apache.iotdb.db.sync.sender.recover.SyncSenderLogAnalyzer;
 import org.apache.iotdb.db.sync.sender.recover.SyncSenderLogger;
 import org.apache.iotdb.db.utils.SyncUtils;
+import org.apache.iotdb.db.utils.TestOnly;
 import org.apache.iotdb.rpc.RpcTransportFactory;
 import org.apache.iotdb.rpc.TConfigurationConst;
 import org.apache.iotdb.rpc.TSocketWrapper;
@@ -797,9 +798,27 @@ public class SyncClient implements ISyncClient {
 
   private String getFileInfoWithVgAndTimePartition(File file) {
     return file.getParentFile().getParentFile().getName()
-        + SyncConstant.SYNC_DIR_NAME_SEPARATOR
+        + SyncConstant.SYNC_FILE_DIR_SEPARATOR
         + file.getParentFile().getName()
-        + SyncConstant.SYNC_DIR_NAME_SEPARATOR
+        + SyncConstant.SYNC_FILE_DIR_SEPARATOR
         + file.getName();
   }
+
+  @TestOnly
+  public void setContent(
+      SyncService.Client serviceClient,
+      boolean isSyncConnect,
+      Map<String, Map<Long, Set<Long>>> allSG,
+      Map<String, Map<Long, Map<Long, Set<File>>>> toBeSyncedFilesMap,
+      Map<String, Map<Long, Map<Long, Set<File>>>> deletedFilesMap,
+      Map<String, Map<Long, Map<Long, Set<File>>>> lastLocalFilesMap,
+      ISyncSenderLogger syncLog) {
+    this.serviceClient = serviceClient;
+    this.isSyncConnect = isSyncConnect;
+    this.allSG = allSG;
+    this.toBeSyncedFilesMap = toBeSyncedFilesMap;
+    this.deletedFilesMap = deletedFilesMap;
+    this.lastLocalFilesMap = lastLocalFilesMap;
+    this.syncLog = syncLog;
+  }
 }
diff --git 
a/server/src/test/java/org/apache/iotdb/db/sync/sender/transfer/SyncClientTest.java
 
b/server/src/test/java/org/apache/iotdb/db/sync/sender/transfer/SyncClientTest.java
index 7c1fd3a..6254bda 100644
--- 
a/server/src/test/java/org/apache/iotdb/db/sync/sender/transfer/SyncClientTest.java
+++ 
b/server/src/test/java/org/apache/iotdb/db/sync/sender/transfer/SyncClientTest.java
@@ -23,14 +23,24 @@ import 
org.apache.iotdb.db.conf.directories.DirectoryManager;
 import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
 import org.apache.iotdb.db.exception.DiskSpaceInsufficientException;
 import org.apache.iotdb.db.exception.StorageEngineException;
+import org.apache.iotdb.db.sync.conf.SyncConstant;
 import org.apache.iotdb.db.sync.conf.SyncSenderConfig;
 import org.apache.iotdb.db.sync.conf.SyncSenderDescriptor;
+import org.apache.iotdb.db.sync.receiver.transfer.SyncServiceImpl;
 import org.apache.iotdb.db.sync.sender.recover.ISyncSenderLogAnalyzer;
+import org.apache.iotdb.db.sync.sender.recover.ISyncSenderLogger;
 import org.apache.iotdb.db.sync.sender.recover.SyncSenderLogAnalyzer;
+import org.apache.iotdb.db.sync.sender.recover.SyncSenderLogger;
 import org.apache.iotdb.db.utils.EnvironmentUtils;
+import org.apache.iotdb.service.sync.thrift.ConfirmInfo;
+import org.apache.iotdb.service.sync.thrift.SyncService;
+import org.apache.iotdb.service.sync.thrift.SyncStatus;
 import org.apache.iotdb.tsfile.utils.FilePathUtils;
 
+import org.apache.thrift.TException;
+import org.apache.thrift.protocol.TProtocol;
 import org.junit.After;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 import org.slf4j.Logger;
@@ -38,12 +48,9 @@ import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.io.IOException;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
+import java.nio.ByteBuffer;
+import java.util.*;
 import java.util.Map.Entry;
-import java.util.Random;
-import java.util.Set;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -57,13 +64,26 @@ public class SyncClientTest {
   private String dataDir;
   private ISyncSenderLogAnalyzer senderLogAnalyzer;
 
+  private String tsfileDataDir;
+  private SyncService.Client serviceClient;
+  private Map<String, Map<Long, Set<Long>>> allSG = new HashMap<>();
+  private Map<String, Map<Long, Map<Long, Set<File>>>> toBeSyncedFilesMap = 
new HashMap<>();
+  private Map<String, Map<Long, Map<Long, Set<File>>>> deletedFilesMap = new 
HashMap<>();
+  private Map<String, Map<Long, Map<Long, Set<File>>>> lastLocalFilesMap = new 
HashMap<>();
+  private ISyncSenderLogger syncLog;
+  private Map<String, List<String>> allFiles = new HashMap<>();
+
+  private String sg1 = "root.sg1";
+  private String sg2 = "root.sg_2";
+
   @Before
   public void setUp() throws DiskSpaceInsufficientException {
     EnvironmentUtils.envSetUp();
     dataDir =
-        new File(DirectoryManager.getInstance().getNextFolderForSequenceFile())
-            .getParentFile()
-            .getAbsolutePath();
+        tsfileDataDir =
+            new 
File(DirectoryManager.getInstance().getNextFolderForSequenceFile())
+                .getParentFile()
+                .getAbsolutePath();
     config.update(dataDir);
     senderLogAnalyzer = new 
SyncSenderLogAnalyzer(config.getSenderFolderPath());
   }
@@ -158,4 +178,224 @@ public class SyncClientTest {
     assertFalse(new File(config.getSnapshotPath()).exists());
     assertTrue(new File(config.getLastFileInfoPath()).exists());
   }
+
+  private void prepareMap(Map<String, Map<Long, Map<Long, Set<File>>>> map) {
+    map.put(sg1, new HashMap<>());
+    map.put(sg2, new HashMap<>());
+    map.get(sg1).put(0l, new HashMap<>());
+    map.get(sg2).put(0l, new HashMap<>());
+    map.get(sg1).get(0l).put(0l, new HashSet<>());
+    map.get(sg2).get(0l).put(0l, new HashSet<>());
+  }
+
+  private File getTsFile(String sgName, Integer vg, Integer partition, String 
tsFileName) {
+    String fileName =
+        tsfileDataDir
+            + File.separator
+            + sgName
+            + File.separator
+            + vg
+            + File.separator
+            + partition
+            + File.separator
+            + tsFileName;
+    File file = new File(fileName);
+    File resource = new File(fileName + ".resource");
+    try {
+      file.createNewFile();
+      resource.createNewFile();
+    } catch (IOException e) {
+      logger.warn("Can not create tsfile: ", file.getPath());
+    }
+    return file;
+  }
+
+  private void prepare() throws Exception {
+    serviceClient = new MockSyncClient(null);
+    syncLog = new SyncSenderLogger(new File(dataDir + "sync.log"));
+
+    allSG.put(sg1, new HashMap<>());
+    allSG.put(sg2, new HashMap<>());
+    allSG.get(sg1).put(0l, new HashSet<>());
+    allSG.get(sg2).put(0l, new HashSet<>());
+    allSG.get(sg1).get(0l).add(0l);
+    allSG.get(sg2).get(0l).add(0l);
+
+    prepareMap(toBeSyncedFilesMap);
+    prepareMap(deletedFilesMap);
+    prepareMap(lastLocalFilesMap);
+
+    lastLocalFilesMap.get(sg1).get(0l).get(0l).add(getTsFile(sg1, 0, 0, 
"1-1-0-0.tsfile"));
+    lastLocalFilesMap.get(sg1).get(0l).get(0l).add(getTsFile(sg1, 0, 0, 
"2-2-0-0.tsfile"));
+    lastLocalFilesMap.get(sg1).get(0l).get(0l).add(getTsFile(sg1, 0, 0, 
"3-3-0-0.tsfile"));
+    lastLocalFilesMap.get(sg1).get(0l).get(0l).add(getTsFile(sg1, 0, 0, 
"4-4-0-0.tsfile"));
+    lastLocalFilesMap.get(sg2).get(0l).get(0l).add(getTsFile(sg2, 0, 0, 
"1-1-0-0.tsfile"));
+
+    deletedFilesMap.get(sg1).get(0l).get(0l).add(getTsFile(sg1, 0, 0, 
"1-1-0-0.tsfile"));
+    deletedFilesMap.get(sg1).get(0l).get(0l).add(getTsFile(sg1, 0, 0, 
"2-2-0-0.tsfile"));
+    deletedFilesMap.get(sg2).get(0l).get(0l).add(getTsFile(sg2, 0, 0, 
"1-1-0-0.tsfile"));
+
+    toBeSyncedFilesMap.get(sg1).get(0l).get(0l).add(getTsFile(sg1, 0, 0, 
"5-5-0-0.tsfile"));
+    toBeSyncedFilesMap.get(sg1).get(0l).get(0l).add(getTsFile(sg1, 0, 0, 
"6-6-0-0.tsfile"));
+    toBeSyncedFilesMap.get(sg2).get(0l).get(0l).add(getTsFile(sg2, 0, 0, 
"2-2-0-0.tsfile"));
+
+    allFiles.put(sg1, new ArrayList<>());
+    allFiles.put(sg2, new ArrayList<>());
+    for (int i = 1; i <= 6; i++) {
+      allFiles.get(sg1).add(getTsFile(sg1, 0, 0, i + "-" + i + 
"-0-0.tsfile").getPath());
+    }
+    for (int i = 1; i <= 2; i++) {
+      allFiles.get(sg2).add(getTsFile(sg2, 0, 0, i + "-" + i + 
"-0-0.tsfile").getPath());
+    }
+
+    ((SyncClient) manager)
+        .setContent(
+            serviceClient,
+            true,
+            allSG,
+            toBeSyncedFilesMap,
+            deletedFilesMap,
+            lastLocalFilesMap,
+            syncLog);
+  }
+
+  @Test
+  public void testGetFileInfoWithVgAndPartition() {
+    try {
+      prepare();
+      List<String> serviceClientStrings;
+      SyncServiceImpl syncServiceImpl = new SyncServiceImpl();
+
+      manager.syncDeletedFilesNameInOneGroup(sg1, 0l, 0l, 
deletedFilesMap.get(sg1).get(0l).get(0l));
+      manager.syncDeletedFilesNameInOneGroup(sg2, 0l, 0l, 
deletedFilesMap.get(sg2).get(0l).get(0l));
+
+      manager.syncDataFilesInOneGroup(sg1, 0l, 0l, 
toBeSyncedFilesMap.get(sg1).get(0l).get(0l));
+      manager.syncDataFilesInOneGroup(sg2, 0l, 0l, 
toBeSyncedFilesMap.get(sg2).get(0l).get(0l));
+
+      serviceClientStrings = ((MockSyncClient) serviceClient).getString();
+
+      for (String string : serviceClientStrings) {
+        string = syncServiceImpl.getFilePathByFileInfo(string);
+        if (string.endsWith(".tsfile")) {
+          boolean in = false;
+          for (String path : allFiles.get(sg1)) {
+            if (path.contains(string)) in = true;
+          }
+          for (String path : allFiles.get(sg2)) {
+            if (path.contains(string)) in = true;
+          }
+          assertTrue(in);
+        }
+      }
+    } catch (Exception e) {
+      Assert.fail();
+    }
+  }
+
+  static class MockSyncClient extends SyncService.Client {
+    boolean isConnected = true;
+    boolean isError = false;
+
+    List<String> strings = new ArrayList<>();
+    List<ByteBuffer> byteBuffers = new ArrayList<>();
+
+    public void setConnected(boolean connected) {
+      isConnected = connected;
+    }
+
+    public void setError(boolean error) {
+      isError = error;
+    }
+
+    public List<String> getString() {
+      return strings;
+    }
+
+    public void setStrings(List<String> strings) {
+      this.strings = strings;
+    }
+
+    public List<ByteBuffer> getByteBuffer() {
+      return byteBuffers;
+    }
+
+    public void setByteBuffers(List<ByteBuffer> byteBuffers) {
+      this.byteBuffers = byteBuffers;
+    }
+
+    public MockSyncClient(TProtocol prot) {
+      super(prot);
+    }
+
+    private SyncStatus getSuccessStatus() {
+      return new SyncStatus(SyncConstant.SUCCESS_CODE, "");
+    }
+
+    private SyncStatus getSuccessStatus(String msg) {
+      return new SyncStatus(SyncConstant.SUCCESS_CODE, msg);
+    }
+
+    private SyncStatus getErrorStatus() {
+      return new SyncStatus(SyncConstant.ERROR_CODE, "");
+    }
+
+    private SyncStatus getErrorStatus(String msg) {
+      return new SyncStatus(SyncConstant.ERROR_CODE, msg);
+    }
+
+    private SyncStatus getMockSyncStatus() throws TException {
+      if (!isConnected) throw new TException("Read time out");
+      return isError ? getErrorStatus() : getSuccessStatus();
+    }
+
+    private SyncStatus getMockSyncStatus(String msg) throws TException {
+      if (!isConnected) throw new TException("Read time out");
+      return isError ? getErrorStatus(msg) : getSuccessStatus(msg);
+    }
+
+    @Override
+    public SyncStatus check(ConfirmInfo info) throws TException {
+      return getMockSyncStatus();
+    }
+
+    @Override
+    public SyncStatus startSync() throws TException {
+      return getMockSyncStatus();
+    }
+
+    @Override
+    public SyncStatus init(String storageGroupName) throws TException {
+      strings.add(storageGroupName);
+      return getMockSyncStatus();
+    }
+
+    @Override
+    public SyncStatus syncDeletedFileName(String fileName) throws TException {
+      strings.add(fileName);
+      return getMockSyncStatus();
+    }
+
+    @Override
+    public SyncStatus initSyncData(String filename) throws TException {
+      strings.add(filename);
+      return getMockSyncStatus();
+    }
+
+    @Override
+    public SyncStatus syncData(ByteBuffer buff) throws TException {
+      byteBuffers.add(buff);
+      return getMockSyncStatus();
+    }
+
+    @Override
+    public SyncStatus checkDataDigest(String md5) throws TException {
+      strings.add(md5);
+      return getMockSyncStatus(md5);
+    }
+
+    @Override
+    public SyncStatus endSync() throws TException {
+      return getMockSyncStatus();
+    }
+  }
 }

Reply via email to