This is an automated email from the ASF dual-hosted git repository. jiangtian pushed a commit to branch add_checking_tool in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
commit 966a13f7a1019344ab1e4afd0af4b0a9f1b3b750 Author: jt <[email protected]> AuthorDate: Sun Mar 3 11:32:58 2019 +0800 add wal checker --- .../SysCheckException.java} | 23 +++-- .../java/org/apache/iotdb/db/tools/WalChecker.java | 108 +++++++++++++++++++++ .../apache/iotdb/db/writelog/io/ILogReader.java | 7 +- .../apache/iotdb/db/writelog/io/RAFLogReader.java | 45 ++++----- .../recover/ExclusiveLogRecoverPerformer.java | 18 +++- 5 files changed, 160 insertions(+), 41 deletions(-) diff --git a/iotdb/src/main/java/org/apache/iotdb/db/writelog/io/ILogReader.java b/iotdb/src/main/java/org/apache/iotdb/db/exception/SysCheckException.java similarity index 69% copy from iotdb/src/main/java/org/apache/iotdb/db/writelog/io/ILogReader.java copy to iotdb/src/main/java/org/apache/iotdb/db/exception/SysCheckException.java index e0d5275..640acae 100644 --- a/iotdb/src/main/java/org/apache/iotdb/db/writelog/io/ILogReader.java +++ b/iotdb/src/main/java/org/apache/iotdb/db/exception/SysCheckException.java @@ -16,16 +16,23 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.iotdb.db.writelog.io; -import java.io.File; -import java.io.FileNotFoundException; -import java.util.Iterator; -import org.apache.iotdb.db.qp.physical.PhysicalPlan; +package org.apache.iotdb.db.exception; -public interface ILogReader extends Iterator<PhysicalPlan> { +public class SysCheckException extends Exception { - void open(File file) throws FileNotFoundException; + public SysCheckException() { + } - void close(); + public SysCheckException(String message) { + super(message); + } + + public SysCheckException(String message, Throwable cause) { + super(message, cause); + } + + public SysCheckException(Throwable cause) { + super(cause); + } } diff --git a/iotdb/src/main/java/org/apache/iotdb/db/tools/WalChecker.java b/iotdb/src/main/java/org/apache/iotdb/db/tools/WalChecker.java new file mode 100644 index 0000000..ed96f25 --- /dev/null +++ b/iotdb/src/main/java/org/apache/iotdb/db/tools/WalChecker.java @@ -0,0 +1,108 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.tools; + +import static org.apache.iotdb.db.writelog.node.ExclusiveWriteLogNode.WAL_FILE_NAME; + +import java.io.File; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import org.apache.iotdb.db.exception.SysCheckException; +import org.apache.iotdb.db.writelog.io.RAFLogReader; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * WalChecker verifies that whether all write ahead logs in the WAL folder are recognizable. + */ +public class WalChecker { + + private static final Logger LOGGER = LoggerFactory.getLogger(WalChecker.class); + + /** + * the root dir of wals, which should have wal directories of storage groups as its children. + */ + private String walFolder; + + public WalChecker(String walFolder) { + this.walFolder = walFolder; + } + + public void doCheck() throws SysCheckException { + File walFolderFile = new File(walFolder); + if(!walFolderFile.exists() || !walFolderFile.isDirectory()) { + throw new SysCheckException(String.format("%s is not a directory", walFolder)); + } + + File[] storageWalFolders = walFolderFile.listFiles(); + if (storageWalFolders == null || storageWalFolders.length == 0) { + LOGGER.info("No sub-directories under the given directory, check ends"); + return; + } + + List<File> failedFile = new ArrayList<>(); + for (int dirIndex = 0; dirIndex < storageWalFolders.length; dirIndex++) { + File storageWalFolder = storageWalFolders[dirIndex]; + LOGGER.debug("Checking the No.{} directory {}", dirIndex, storageWalFolder.getName()); + File walFile = new File(storageWalFolder, WAL_FILE_NAME); + if (!walFile.exists()) { + LOGGER.debug("No wal file in this dir, skipping"); + continue; + } + + RAFLogReader logReader = null; + try { + logReader = new RAFLogReader(walFile); + while (logReader.hasNext()) { + logReader.next(); + } + } catch (IOException e) { + failedFile.add(walFile); + LOGGER.error("{} fails the check because", walFile.getAbsoluteFile(), e); + } finally { + if( logReader != null) { + logReader.close(); + } + } + } + + if (failedFile.isEmpty()) { + LOGGER.info("Check finished. There is no damaged file"); + } else { + LOGGER.error("There are {} files failed the check. They are {}", failedFile.size(), failedFile); + } + } + + /** + * + * @param args walRootDirectory + */ + public static void main(String[] args) throws SysCheckException { + if (args.length < 1) { + LOGGER.error("No enough args: requires the walRootDirectory"); + return; + } + + WalChecker checker = new WalChecker(args[0]); + checker.doCheck(); + } +} diff --git a/iotdb/src/main/java/org/apache/iotdb/db/writelog/io/ILogReader.java b/iotdb/src/main/java/org/apache/iotdb/db/writelog/io/ILogReader.java index e0d5275..55e30a8 100644 --- a/iotdb/src/main/java/org/apache/iotdb/db/writelog/io/ILogReader.java +++ b/iotdb/src/main/java/org/apache/iotdb/db/writelog/io/ILogReader.java @@ -20,12 +20,17 @@ package org.apache.iotdb.db.writelog.io; import java.io.File; import java.io.FileNotFoundException; +import java.io.IOException; import java.util.Iterator; import org.apache.iotdb.db.qp.physical.PhysicalPlan; -public interface ILogReader extends Iterator<PhysicalPlan> { +public interface ILogReader { void open(File file) throws FileNotFoundException; void close(); + + boolean hasNext() throws IOException; + + PhysicalPlan next() throws IOException; } diff --git a/iotdb/src/main/java/org/apache/iotdb/db/writelog/io/RAFLogReader.java b/iotdb/src/main/java/org/apache/iotdb/db/writelog/io/RAFLogReader.java index 42e320e..6700805 100644 --- a/iotdb/src/main/java/org/apache/iotdb/db/writelog/io/RAFLogReader.java +++ b/iotdb/src/main/java/org/apache/iotdb/db/writelog/io/RAFLogReader.java @@ -48,42 +48,33 @@ public class RAFLogReader implements ILogReader { } @Override - public boolean hasNext() { + public boolean hasNext() throws IOException{ if (planBuffer != null) { return true; } - try { - if (logRaf.getFilePointer() + 12 > logRaf.length()) { - return false; - } - } catch (IOException e) { - logger.error("Cannot read from log file {}", filepath, e); + + if (logRaf.getFilePointer() + 12 > logRaf.length()) { return false; } - try { - int logSize = logRaf.readInt(); - if (logSize > bufferSize) { - bufferSize = logSize; - buffer = new byte[bufferSize]; - } - final long checkSum = logRaf.readLong(); - logRaf.read(buffer, 0, logSize); - checkSummer.reset(); - checkSummer.update(buffer, 0, logSize); - if (checkSummer.getValue() != checkSum) { - return false; - } - PhysicalPlan plan = PhysicalPlanLogTransfer.logToOperator(buffer); - planBuffer = plan; - return true; - } catch (IOException e) { - logger.error("Cannot read log file {}", filepath, e); - return false; + + int logSize = logRaf.readInt(); + if (logSize > bufferSize) { + bufferSize = logSize; + buffer = new byte[bufferSize]; + } + final long checkSum = logRaf.readLong(); + logRaf.read(buffer, 0, logSize); + checkSummer.reset(); + checkSummer.update(buffer, 0, logSize); + if (checkSummer.getValue() != checkSum) { + throw new IOException("The check sum is incorrect!"); } + planBuffer = PhysicalPlanLogTransfer.logToOperator(buffer); + return true; } @Override - public PhysicalPlan next() { + public PhysicalPlan next() throws IOException { if (!hasNext()){ throw new NoSuchElementException(); } diff --git a/iotdb/src/main/java/org/apache/iotdb/db/writelog/recover/ExclusiveLogRecoverPerformer.java b/iotdb/src/main/java/org/apache/iotdb/db/writelog/recover/ExclusiveLogRecoverPerformer.java index 56db00a..15e88dd 100644 --- a/iotdb/src/main/java/org/apache/iotdb/db/writelog/recover/ExclusiveLogRecoverPerformer.java +++ b/iotdb/src/main/java/org/apache/iotdb/db/writelog/recover/ExclusiveLogRecoverPerformer.java @@ -259,15 +259,15 @@ public class ExclusiveLogRecoverPerformer implements RecoverPerformer { replayLog(); } - private int replayLogFile(File logFile) throws RecoverException { + private int replayLogFile(File logFile) throws RecoverException, IOException { int failedCnt = 0; if (logFile.exists()) { try { rafLogReader.open(logFile); } catch (FileNotFoundException e) { logger - .error("Log node {} cannot read old log file, because {}", writeLogNode.getIdentifier(), - e.getMessage()); + .error("Log node {} cannot read old log file, because ", writeLogNode.getIdentifier(), + e); throw new RecoverException("Cannot read old log file, recovery aborted."); } while (rafLogReader.hasNext()) { @@ -294,11 +294,19 @@ public class ExclusiveLogRecoverPerformer implements RecoverPerformer { File oldLogFile = new File( writeLogNode.getLogDirectory() + File.separator + ExclusiveWriteLogNode.WAL_FILE_NAME + ExclusiveWriteLogNode.OLD_SUFFIX); - failedEntryCnt += replayLogFile(oldLogFile); + try { + failedEntryCnt += replayLogFile(oldLogFile); + } catch (IOException e) { + throw new RecoverException(e); + } // then replay new log File newLogFile = new File( writeLogNode.getLogDirectory() + File.separator + ExclusiveWriteLogNode.WAL_FILE_NAME); - failedEntryCnt += replayLogFile(newLogFile); + try { + failedEntryCnt += replayLogFile(newLogFile); + } catch (IOException e) { + throw new RecoverException(e); + } // TODO : do we need to proceed if there are failed logs ? if (failedEntryCnt > 0) { throw new RecoverException(
