This is an automated email from the ASF dual-hosted git repository. lta pushed a commit to branch reimpl_sync in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
commit d287ba13fefa56a1909e96362def3fbb52bf8329 Author: lta <[email protected]> AuthorDate: Wed Sep 4 11:14:37 2019 +0800 fix some acute bug in testing --- .../receiver/recover/SyncReceiverLogAnalyzer.java | 2 ++ .../db/sync/receiver/transfer/SyncServiceImpl.java | 25 ++++++++++------ .../db/sync/sender/conf/SyncSenderConfig.java | 5 ++-- .../sync/sender/transfer/DataTransferManager.java | 35 +++++++++++++--------- 4 files changed, 42 insertions(+), 25 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/sync/receiver/recover/SyncReceiverLogAnalyzer.java b/server/src/main/java/org/apache/iotdb/db/sync/receiver/recover/SyncReceiverLogAnalyzer.java index 375c8a1..b4899ce 100644 --- a/server/src/main/java/org/apache/iotdb/db/sync/receiver/recover/SyncReceiverLogAnalyzer.java +++ b/server/src/main/java/org/apache/iotdb/db/sync/receiver/recover/SyncReceiverLogAnalyzer.java @@ -22,6 +22,7 @@ import java.io.BufferedReader; import java.io.File; import java.io.FileReader; import java.io.IOException; +import org.apache.commons.io.FileUtils; import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.sync.receiver.load.FileLoader; import org.apache.iotdb.db.sync.receiver.load.FileLoaderManager; @@ -65,6 +66,7 @@ public class SyncReceiverLogAnalyzer implements ISyncReceiverLogAnalyzer { // check the state if (!new File(senderFolder, SyncConstant.SYNC_LOG_NAME).exists()) { new File(senderFolder, SyncConstant.LOAD_LOG_NAME).delete(); + FileUtils.deleteDirectory(new File(senderFolder, SyncConstant.RECEIVER_DATA_FOLDER_NAME)); return true; } if (FileLoaderManager.getInstance().containsFileLoader(senderFolder.getName())) { diff --git a/server/src/main/java/org/apache/iotdb/db/sync/receiver/transfer/SyncServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/sync/receiver/transfer/SyncServiceImpl.java index 8a6064a..a83a245 100644 --- a/server/src/main/java/org/apache/iotdb/db/sync/receiver/transfer/SyncServiceImpl.java +++ b/server/src/main/java/org/apache/iotdb/db/sync/receiver/transfer/SyncServiceImpl.java @@ -27,10 +27,10 @@ import java.math.BigInteger; import java.nio.ByteBuffer; import java.nio.channels.FileChannel; import java.security.MessageDigest; +import java.security.NoSuchAlgorithmException; import java.util.Collections; import java.util.HashMap; import java.util.Map; -import org.apache.commons.io.FileUtils; import org.apache.iotdb.db.concurrent.ThreadName; import org.apache.iotdb.db.conf.IoTDBConfig; import org.apache.iotdb.db.conf.IoTDBDescriptor; @@ -103,7 +103,9 @@ public class SyncServiceImpl implements SyncService.Iface { if (currentFileWriter.get() != null && currentFileWriter.get().isOpen()) { currentFileWriter.get().close(); } - syncLog.get().close(); + if (syncLog.get() != null) { + syncLog.get().close(); + } return SyncReceiverLogAnalyzer.getInstance().recover(senderName.get()); } catch (IOException e) { logger.error("Check recovery state fail", e); @@ -117,7 +119,7 @@ public class SyncServiceImpl implements SyncService.Iface { initPath(); currentSG.remove(); FileLoader.createFileLoader(senderName.get(), syncFolderPath.get()); - syncLog.set(new SyncReceiverLogger(new File(getSyncDataPath(), SyncConstant.SYNC_LOG_NAME))); + syncLog.set(new SyncReceiverLogger(new File(syncFolderPath.get(), SyncConstant.SYNC_LOG_NAME))); return getSuccessResult(); } catch (DiskSpaceInsufficientException | IOException e) { logger.error("Can not receiver data from sender", e); @@ -175,6 +177,7 @@ public class SyncServiceImpl implements SyncService.Iface { } else { file = new File(getSyncDataPath(), currentSG.get() + File.separatorChar + filename); } + file.delete(); currentFile.set(file); if (!file.getParentFile().exists()) { file.getParentFile().mkdirs(); @@ -184,7 +187,8 @@ public class SyncServiceImpl implements SyncService.Iface { } currentFileWriter.set(new FileOutputStream(file).getChannel()); syncLog.get().startSyncTsFiles(); - } catch (IOException e) { + messageDigest.set(MessageDigest.getInstance(SyncConstant.MESSAGE_DIGIT_NAME)); + } catch (IOException | NoSuchAlgorithmException e) { logger.error("Can not init sync resource for file {}", filename, e); return getErrorResult( String.format("Can not init sync resource for file %s because %s", filename, @@ -197,6 +201,7 @@ public class SyncServiceImpl implements SyncService.Iface { public ResultStatus syncData(ByteBuffer buff) { try { currentFileWriter.get().write(buff); + buff.flip(); messageDigest.get().update(buff); } catch (IOException e) { logger.error("Can not sync data for file {}", currentFile.get().getAbsoluteFile(), e); @@ -213,7 +218,7 @@ public class SyncServiceImpl implements SyncService.Iface { try { currentFileWriter.get().close(); if (!md5OfSender.equals(md5OfReceiver)) { - FileUtils.forceDelete(currentFile.get()); + currentFile.get().delete(); currentFileWriter.set(new FileOutputStream(currentFile.get()).getChannel()); } else { if (currentFile.get().getName().endsWith(MetadataConstant.METADATA_LOG)) { @@ -241,13 +246,15 @@ public class SyncServiceImpl implements SyncService.Iface { new java.io.FileReader(currentFile.get()))) { String metadataOperation; while ((metadataOperation = br.readLine()) != null) { - operation(metadataOperation); + try { + operation(metadataOperation); + } catch (IOException | MetadataErrorException | PathErrorException e) { + // multiple insert schema, ignore it. + } } - } catch (FileNotFoundException e) { + } catch (IOException e) { logger.error("Cannot read the file {}.", currentFile.get().getAbsoluteFile(), e); return false; - } catch (IOException | MetadataErrorException | PathErrorException e) { - // multiple insert schema, ignore it. } } return true; diff --git a/server/src/main/java/org/apache/iotdb/db/sync/sender/conf/SyncSenderConfig.java b/server/src/main/java/org/apache/iotdb/db/sync/sender/conf/SyncSenderConfig.java index cc6f11c..3216558 100644 --- a/server/src/main/java/org/apache/iotdb/db/sync/sender/conf/SyncSenderConfig.java +++ b/server/src/main/java/org/apache/iotdb/db/sync/sender/conf/SyncSenderConfig.java @@ -19,6 +19,7 @@ package org.apache.iotdb.db.sync.sender.conf; import java.io.File; +import java.util.ArrayList; import java.util.List; public class SyncSenderConfig { @@ -27,7 +28,7 @@ public class SyncSenderConfig { private int serverPort = 5555; - private int syncPeriodInSecond = 10; + private int syncPeriodInSecond = 600; private String senderFolderPath; @@ -42,7 +43,7 @@ public class SyncSenderConfig { /** * Storage groups which participate in sync process */ - private List<String> storageGroupList; + private List<String> storageGroupList = new ArrayList<>(); /** * Update paths based on data directory diff --git a/server/src/main/java/org/apache/iotdb/db/sync/sender/transfer/DataTransferManager.java b/server/src/main/java/org/apache/iotdb/db/sync/sender/transfer/DataTransferManager.java index 0952fe5..3d550a1 100644 --- a/server/src/main/java/org/apache/iotdb/db/sync/sender/transfer/DataTransferManager.java +++ b/server/src/main/java/org/apache/iotdb/db/sync/sender/transfer/DataTransferManager.java @@ -132,16 +132,20 @@ public class DataTransferManager implements IDataTransferManager { * running. */ private void verifySingleton() throws IOException { - File lockFile = new File(config.getLockFilePath()); - if (!lockFile.getParentFile().exists()) { - lockFile.getParentFile().mkdirs(); - } - if (!lockFile.exists()) { - lockFile.createNewFile(); - } - if (!lockInstance(config.getLockFilePath())) { - logger.error("Sync client is already running."); - System.exit(1); + String[] dataDirs = IoTDBDescriptor.getInstance().getConfig().getDataDirs(); + for (String dataDir : dataDirs) { + config.update(dataDir); + File lockFile = new File(config.getLockFilePath()); + if (!lockFile.getParentFile().exists()) { + lockFile.getParentFile().mkdirs(); + } + if (!lockFile.exists()) { + lockFile.createNewFile(); + } + if (!lockInstance(config.getLockFilePath())) { + logger.error("Sync client is already running."); + System.exit(1); + } } } @@ -343,8 +347,9 @@ public class DataTransferManager implements IDataTransferManager { try (BufferedReader br = new BufferedReader(new FileReader(getSchemaLogFile())); ByteArrayOutputStream bos = new ByteArrayOutputStream(SyncConstant.DATA_CHUNK_SIZE)) { schemaFileLinePos = 0; - while (schemaFileLinePos++ <= schemaPos) { + while (schemaFileLinePos < schemaPos) { br.readLine(); + schemaFileLinePos++; } MessageDigest md = MessageDigest.getInstance(SyncConstant.MESSAGE_DIGIT_NAME); String line; @@ -353,8 +358,9 @@ public class DataTransferManager implements IDataTransferManager { schemaFileLinePos++; byte[] singleLineData = BytesUtils.stringToBytes(line); bos.write(singleLineData); - md.update(singleLineData); + bos.write("\r\n".getBytes()); if (cntLine++ == BATCH_LINE) { + md.update(bos.toByteArray()); ByteBuffer buffToSend = ByteBuffer.wrap(bos.toByteArray()); bos.reset(); ResultStatus status = serviceClient.syncData(buffToSend); @@ -366,6 +372,7 @@ public class DataTransferManager implements IDataTransferManager { } } if (bos.size() != 0) { + md.update(bos.toByteArray()); ByteBuffer buffToSend = ByteBuffer.wrap(bos.toByteArray()); bos.reset(); ResultStatus status = serviceClient.syncData(buffToSend); @@ -376,7 +383,7 @@ public class DataTransferManager implements IDataTransferManager { } // check md5 - return checkMD5ForSchema((new BigInteger(1, md.digest())).toString(16)); + return checkMD5ForSchema(new BigInteger(1, md.digest()).toString(16)); } catch (NoSuchAlgorithmException | IOException | TException e) { logger.error("Can not finish transfer schema to receiver", e); return false; @@ -497,7 +504,7 @@ public class DataTransferManager implements IDataTransferManager { try { File snapshotFile = makeFileSnapshot(tsfile); // firstly sync .restore file, then sync tsfile - syncSingleFile(new File(snapshotFile, TsFileResource.RESOURCE_SUFFIX)); + syncSingleFile(new File(snapshotFile.getAbsolutePath() + TsFileResource.RESOURCE_SUFFIX)); syncSingleFile(snapshotFile); lastLocalFilesMap.get(sgName).add(tsfile); syncLog.finishSyncTsfile(tsfile);
