This is an automated email from the ASF dual-hosted git repository. lta pushed a commit to branch fix_sync_schema_bug in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 128e1914d6d259db1c5504688ba84c05c07ddcde Author: lta <[email protected]> AuthorDate: Wed Dec 23 00:30:16 2020 +0800 fix sync schema bug --- .../db/sync/receiver/transfer/SyncServiceImpl.java | 5 ++- .../iotdb/db/sync/sender/transfer/SyncClient.java | 45 ++++++---------------- 2 files changed, 14 insertions(+), 36 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/sync/receiver/transfer/SyncServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/sync/receiver/transfer/SyncServiceImpl.java index 7d8df48..3572bac 100644 --- a/server/src/main/java/org/apache/iotdb/db/sync/receiver/transfer/SyncServiceImpl.java +++ b/server/src/main/java/org/apache/iotdb/db/sync/receiver/transfer/SyncServiceImpl.java @@ -270,10 +270,11 @@ public class SyncServiceImpl implements SyncService.Iface { private void loadMetadata() { logger.info("Start to load metadata in sync process."); if (currentFile.get().exists()) { - try (MLogReader mLogReader = new MLogReader(config.getSchemaDir(), MetadataConstant.METADATA_LOG)) { + try (MLogReader mLogReader = new MLogReader(currentFile.get())) { while (mLogReader.hasNext()) { - PhysicalPlan plan = mLogReader.next(); + PhysicalPlan plan = null; try { + plan = mLogReader.next(); if (plan == null) { continue; } diff --git a/server/src/main/java/org/apache/iotdb/db/sync/sender/transfer/SyncClient.java b/server/src/main/java/org/apache/iotdb/db/sync/sender/transfer/SyncClient.java index 838d74e..5e347f6 100644 --- a/server/src/main/java/org/apache/iotdb/db/sync/sender/transfer/SyncClient.java +++ b/server/src/main/java/org/apache/iotdb/db/sync/sender/transfer/SyncClient.java @@ -72,7 +72,6 @@ import org.apache.iotdb.rpc.RpcTransportFactory; import org.apache.iotdb.service.sync.thrift.ConfirmInfo; import org.apache.iotdb.service.sync.thrift.SyncService; import org.apache.iotdb.service.sync.thrift.SyncStatus; -import org.apache.iotdb.tsfile.utils.BytesUtils; import org.apache.thrift.TException; import org.apache.thrift.protocol.TBinaryProtocol; import org.apache.thrift.protocol.TCompactProtocol; @@ -89,8 +88,6 @@ public class SyncClient implements ISyncClient { private static SyncSenderConfig config = SyncSenderDescriptor.getInstance().getConfig(); - private static final int BATCH_LINE = 1000; - private static final int TIMEOUT_MS = 1000; /** @@ -99,7 +96,7 @@ public class SyncClient implements ISyncClient { * location is recorded once after each synchronization task for the next synchronization task to * use. */ - private int schemaFileLinePos; + private int schemaFilePos; private TTransport transport; @@ -365,45 +362,25 @@ public class SyncClient implements ISyncClient { } private boolean tryToSyncSchema() { - int schemaPos = readSyncSchemaPos(getSchemaPosFile()); + schemaFilePos = readSyncSchemaPos(getSchemaPosFile()); // start to sync file data and get digest of this file. - try (BufferedReader br = new BufferedReader(new FileReader(getSchemaLogFile())); + try (FileInputStream fis = new FileInputStream(getSchemaLogFile()); ByteArrayOutputStream bos = new ByteArrayOutputStream(SyncConstant.DATA_CHUNK_SIZE)) { - schemaFileLinePos = 0; - String line; - while (schemaFileLinePos < schemaPos) { - line = br.readLine(); - schemaFileLinePos++; - } + fis.skip(schemaFilePos); MessageDigest md = MessageDigest.getInstance(SyncConstant.MESSAGE_DIGIT_NAME); - int cntLine = 0; - while ((line = br.readLine()) != null) { - schemaFileLinePos++; - byte[] singleLineData = BytesUtils.stringToBytes(line); - bos.write(singleLineData); - bos.write("\r\n".getBytes()); - if (cntLine++ == BATCH_LINE) { - md.update(bos.toByteArray()); - ByteBuffer buffToSend = ByteBuffer.wrap(bos.toByteArray()); - bos.reset(); - SyncStatus status = serviceClient.syncData(buffToSend); - if (status.code != SUCCESS_CODE) { - logger.error("Receiver failed to receive metadata because {}, retry.", status.msg); - return false; - } - cntLine = 0; - } - } - if (bos.size() != 0) { - md.update(bos.toByteArray()); + byte[] buffer = new byte[SyncConstant.DATA_CHUNK_SIZE]; + int dataLength; + while ((dataLength = fis.read(buffer)) != -1) { + bos.write(buffer, 0, dataLength); + md.update(buffer, 0, dataLength); ByteBuffer buffToSend = ByteBuffer.wrap(bos.toByteArray()); - bos.reset(); SyncStatus status = serviceClient.syncData(buffToSend); if (status.code != SUCCESS_CODE) { logger.error("Receiver failed to receive metadata because {}, retry.", status.msg); return false; } + schemaFilePos += dataLength; } // check digest @@ -453,7 +430,7 @@ public class SyncClient implements ISyncClient { syncSchemaLogFile.createNewFile(); } try (BufferedWriter br = new BufferedWriter(new FileWriter(syncSchemaLogFile))) { - br.write(Integer.toString(schemaFileLinePos)); + br.write(Integer.toString(schemaFilePos)); } } catch (IOException e) { logger.error("Can not find file {}", syncSchemaLogFile.getAbsoluteFile(), e);
