This is an automated email from the ASF dual-hosted git repository.

haonan pushed a commit to branch rel/0.12
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/rel/0.12 by this push:
     new 3f205e1  [To rel/0.12][IOTDB-1877] Fix Sync recovery and reconnection 
bugs in both sender and receiver (#4217)
3f205e1 is described below

commit 3f205e18f21b3dfb673df4f5bae50eab4e919d2c
Author: yschengzi <[email protected]>
AuthorDate: Wed Oct 27 17:28:26 2021 +0800

    [To rel/0.12][IOTDB-1877] Fix Sync recovery and reconnection bugs in both 
sender and receiver (#4217)
---
 .../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 14 +++++++++++++
 .../db/sync/receiver/transfer/SyncServiceImpl.java | 23 ++++------------------
 .../iotdb/db/sync/sender/transfer/SyncClient.java  | 13 +++++++-----
 3 files changed, 26 insertions(+), 24 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java 
b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 3688151..f5845c5 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -1170,6 +1170,20 @@ public class IoTDBConfig {
     this.languageVersion = languageVersion;
   }
 
+  public String getIoTDBVersion() {
+    return IoTDBConstant.VERSION;
+  }
+
+  public String getIoTDBMajorVersion() {
+    return IoTDBConstant.MAJOR_VERSION;
+  }
+
+  public String getIoTDBMajorVersion(String version) {
+    return version.equals("UNKNOWN")
+        ? "UNKNOWN"
+        : version.split("\\.")[0] + "." + version.split("\\.")[1];
+  }
+
   public String getIpWhiteList() {
     return ipWhiteList;
   }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/sync/receiver/transfer/SyncServiceImpl.java
 
b/server/src/main/java/org/apache/iotdb/db/sync/receiver/transfer/SyncServiceImpl.java
index 7ad15ba..ec38878 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/sync/receiver/transfer/SyncServiceImpl.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/sync/receiver/transfer/SyncServiceImpl.java
@@ -20,7 +20,6 @@ package org.apache.iotdb.db.sync.receiver.transfer;
 
 import org.apache.iotdb.db.concurrent.ThreadName;
 import org.apache.iotdb.db.conf.IoTDBConfig;
-import org.apache.iotdb.db.conf.IoTDBConstant;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.conf.directories.DirectoryManager;
 import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
@@ -78,11 +77,11 @@ public class SyncServiceImpl implements SyncService.Iface {
   public SyncStatus check(ConfirmInfo info) {
     String ipAddress = info.address, uuid = info.uuid;
     Thread.currentThread().setName(ThreadName.SYNC_SERVER.getName());
-    if (!getMajorVersion(info.version).equals(IoTDBConstant.MAJOR_VERSION)) {
+    if 
(!config.getIoTDBMajorVersion(info.version).equals(config.getIoTDBMajorVersion()))
 {
       return getErrorResult(
           String.format(
               "Version mismatch: the sender <%s>, the receiver <%s>",
-              info.version, IoTDBConstant.MAJOR_VERSION));
+              info.version, config.getIoTDBVersion()));
     }
     if (info.partitionInterval
         != IoTDBDescriptor.getInstance().getConfig().getPartitionInterval()) {
@@ -106,12 +105,6 @@ public class SyncServiceImpl implements SyncService.Iface {
     }
   }
 
-  private String getMajorVersion(String version) {
-    return version.equals("UNKNOWN")
-        ? "UNKNOWN"
-        : version.split("\\.")[0] + "." + version.split("\\.")[1];
-  }
-
   private boolean checkRecovery() {
     try {
       if (currentFileWriter.get() != null) {
@@ -173,6 +166,7 @@ public class SyncServiceImpl implements SyncService.Iface {
   public SyncStatus syncDeletedFileName(String fileInfo) {
     String filePath = currentSG.get() + File.separator + 
getFilePathByFileInfo(fileInfo);
     try {
+      syncLog.get().startSyncDeletedFilesName();
       syncLog.get().finishSyncDeletedFileName(new File(getSyncDataPath(), 
filePath));
       FileLoaderManager.getInstance()
           .getFileLoader(senderName.get())
@@ -199,15 +193,13 @@ public class SyncServiceImpl implements SyncService.Iface 
{
   public SyncStatus initSyncData(String fileInfo) {
     File file;
     String filePath = fileInfo;
-    logger.info("initSyncData receive: " + fileInfo); // change back
     try {
-      if (currentSG.get() == null) { // schema mlog.txt file
+      if (fileInfo.equals(MetadataConstant.METADATA_LOG)) { // schema mlog.txt 
file
         file = new File(getSyncDataPath(), filePath);
       } else {
         filePath = currentSG.get() + File.separator + 
getFilePathByFileInfo(fileInfo);
         file = new File(getSyncDataPath(), filePath);
       }
-      logger.info("initSyncData file: " + file); // change back
       file.delete();
       currentFile.set(file);
       if (!file.getParentFile().exists()) {
@@ -219,8 +211,6 @@ public class SyncServiceImpl implements SyncService.Iface {
       currentFileWriter.set(new FileOutputStream(file));
       syncLog.get().startSyncTsFiles();
       
messageDigest.set(MessageDigest.getInstance(SyncConstant.MESSAGE_DIGIT_NAME));
-      logger.info(
-          "initSyncData currentFileWriter: " + 
currentFileWriter.get().toString()); // change back
     } catch (IOException | NoSuchAlgorithmException e) {
       logger.error("Can not init sync resource for file {}", filePath, e);
       return getErrorResult(
@@ -234,11 +224,6 @@ public class SyncServiceImpl implements SyncService.Iface {
   public SyncStatus syncData(ByteBuffer buff) {
     try {
       int pos = buff.position();
-      logger.info("syncData currentSG: " + currentSG.get()); // change back
-      logger.info("syncData currentFileWriter: " + 
currentFileWriter.get().toString());
-      logger.info(
-          "syncData currentFileWriterChannel is Open: "
-              + currentFileWriter.get().getChannel().isOpen());
       currentFileWriter.get().getChannel().write(buff);
       buff.position(pos);
       messageDigest.get().update(buff);
diff --git 
a/server/src/main/java/org/apache/iotdb/db/sync/sender/transfer/SyncClient.java 
b/server/src/main/java/org/apache/iotdb/db/sync/sender/transfer/SyncClient.java
index 06ed0b6..d197f9f 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/sync/sender/transfer/SyncClient.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/sync/sender/transfer/SyncClient.java
@@ -21,7 +21,6 @@ package org.apache.iotdb.db.sync.sender.transfer;
 import org.apache.iotdb.db.concurrent.IoTDBThreadPoolFactory;
 import org.apache.iotdb.db.concurrent.ThreadName;
 import org.apache.iotdb.db.conf.IoTDBConfig;
-import org.apache.iotdb.db.conf.IoTDBConstant;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
 import org.apache.iotdb.db.exception.SyncConnectionException;
@@ -265,12 +264,12 @@ public class SyncClient implements ISyncClient {
           dataDirs.length);
 
       config.update(dataDir);
+      checkRecovery();
       syncFileManager.getValidFiles(dataDir);
       allSG = syncFileManager.getAllSGs();
       lastLocalFilesMap = syncFileManager.getLastLocalFilesMap();
       deletedFilesMap = syncFileManager.getDeletedFilesMap();
       toBeSyncedFilesMap = syncFileManager.getToBeSyncedFilesMap();
-      checkRecovery();
       if (SyncUtils.isEmpty(deletedFilesMap) && 
SyncUtils.isEmpty(toBeSyncedFilesMap)) {
         logger.info("There has no data to sync in data dir {}", dataDir);
         continue;
@@ -333,7 +332,9 @@ public class SyncClient implements ISyncClient {
 
     try {
       establishConnection(config.getServerIp(), config.getServerPort());
-    } catch (SyncConnectionException e) {
+      confirmIdentity();
+      serviceClient.startSync();
+    } catch (SyncConnectionException | TException e) {
       logger.warn("Can not reconnect to receiver {}. Caused by ", 
config.getSyncReceiverName(), e);
       return false;
     }
@@ -348,7 +349,7 @@ public class SyncClient implements ISyncClient {
               socket.getLocalAddress().getHostAddress(),
               getOrCreateUUID(getUuidFile()),
               ioTDBConfig.getPartitionInterval(),
-              IoTDBConstant.MAJOR_VERSION);
+              ioTDBConfig.getIoTDBMajorVersion());
       SyncStatus status = serviceClient.check(info);
       if (status.code != SUCCESS_CODE) {
         throw new SyncConnectionException(
@@ -396,7 +397,6 @@ public class SyncClient implements ISyncClient {
       return;
     }
     int retryCount = 0;
-    serviceClient.initSyncData(MetadataConstant.METADATA_LOG);
     while (true) {
       if (retryCount > config.getMaxNumOfSyncFileRetry()) {
         throw new SyncConnectionException(
@@ -421,6 +421,8 @@ public class SyncClient implements ISyncClient {
     // start to sync file data and get digest of this file.
     try (FileInputStream fis = new FileInputStream(getSchemaLogFile());
         ByteArrayOutputStream bos = new 
ByteArrayOutputStream(SyncConstant.DATA_CHUNK_SIZE)) {
+      serviceClient.initSyncData(MetadataConstant.METADATA_LOG);
+
       long skipNum = fis.skip(schemaFilePos);
       if (skipNum != schemaFilePos) {
         logger.warn(
@@ -594,6 +596,7 @@ public class SyncClient implements ISyncClient {
         }
       } catch (TException e) {
         logger.error("Can not sync deleted file name {}, skip it.", file);
+        isSyncConnect = false;
       }
     }
     logger.info("Finish to sync names of deleted files in storage group {}", 
sgName);

Reply via email to