This is an automated email from the ASF dual-hosted git repository. elserj pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git
commit cbdf7bda1ad749c846af8aba5afeac28efb96c65 Author: Josh Elser <els...@apache.org> AuthorDate: Wed Apr 10 16:25:36 2019 -0400 RATIS-525. Usability improvements to VerificationTool * Add lots of new options * Fix LOG/stdout use * Remove logs if they already exist before writing --- .../ratis/logservice/tool/VerificationTool.java | 149 ++++++++++++++------- 1 file changed, 100 insertions(+), 49 deletions(-) diff --git a/ratis-logservice/src/main/java/org/apache/ratis/logservice/tool/VerificationTool.java b/ratis-logservice/src/main/java/org/apache/ratis/logservice/tool/VerificationTool.java index 5fcc841..cf8216d 100644 --- a/ratis-logservice/src/main/java/org/apache/ratis/logservice/tool/VerificationTool.java +++ b/ratis-logservice/src/main/java/org/apache/ratis/logservice/tool/VerificationTool.java @@ -21,58 +21,102 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; import java.util.ArrayList; +import java.util.HashSet; import java.util.List; -import java.util.concurrent.*; +import java.util.Set; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import org.apache.ratis.logservice.api.LogInfo; import org.apache.ratis.logservice.api.LogName; import org.apache.ratis.logservice.api.LogReader; import org.apache.ratis.logservice.api.LogStream; import org.apache.ratis.logservice.api.LogWriter; import org.apache.ratis.logservice.client.LogServiceClient; - -import com.beust.jcommander.JCommander; -import com.beust.jcommander.Parameter; import org.apache.ratis.logservice.server.LogStateMachine; -import org.jline.utils.Log; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.beust.jcommander.JCommander; +import com.beust.jcommander.Parameter; + public class VerificationTool { public static final Logger LOG = LoggerFactory.getLogger(LogStateMachine.class); @Parameter(names = {"-q", "--metaQuorum"}, description = "Metadata Service Quorum", required = true) private String metaQuorum; - public static String LOG_NAME_PREFIX = "testlog"; - public static String MESSAGE_PREFIX = "message"; - + @Parameter(names = {"-nl", "--numLogs"}, description = "Number of logs") private int numLogs = 10; + @Parameter(names = {"-nr", "--numRecords"}, description = "Number of records per log") private int numRecords = 1000; - - public static void main(String[] args) { + @Parameter(names = {"-w", "--write"}, description = "Write to the logs") + private boolean write = true; + @Parameter(names = {"-r", "--read"}, description = "Read the logs") + private boolean read = true; + @Parameter(names = {"-l", "--logFrequency"}, description = "Print update every N operations") + private int logFrequency = 50; + @Parameter(names = {"-h", "--help"}, description = "Help", help = true) + private boolean help = false; + + public static final String LOG_NAME_PREFIX = "testlog"; + public static final String MESSAGE_PREFIX = "message"; + + public static void main(String[] args) throws IOException { VerificationTool tool = new VerificationTool(); - JCommander.newBuilder() + JCommander jc = JCommander.newBuilder() .addObject(tool) - .build() - .parse(args); + .build(); + + jc.parse(args); + if (tool.help) { + jc.usage(); + return; + } System.out.println(tool.metaQuorum); LogServiceClient client = new LogServiceClient(tool.metaQuorum); ExecutorService executor = Executors.newCachedThreadPool(); List<Future<?>> futures = new ArrayList<Future<?>>(tool.numLogs); - for (int i = 0; i < tool.numLogs; i++) { - BulkWriter writer = new BulkWriter(LOG_NAME_PREFIX + i, client, tool.numRecords); - futures.add(executor.submit(writer)); + + if (tool.write) { + LOG.info("Executing parallel writes"); + // Delete any logs that already exist first + final Set<LogName> logsInSystem = new HashSet<>(); + List<LogInfo> listOfLogs = client.listLogs(); + for (LogInfo logInfo : listOfLogs) { + logsInSystem.add(logInfo.getLogName()); + } + + LOG.info("Observed logs already in system: {}", logsInSystem); + for (int i = 0; i < tool.numLogs; i++) { + LogName logName = getLogName(i); + if (logsInSystem.contains(logName)) { + LOG.info("Deleting {}", logName); + client.deleteLog(logName); + } + BulkWriter writer = new BulkWriter(getLogName(i), client, tool.numRecords, tool.logFrequency); + futures.add(executor.submit(writer)); + } + waitForCompletion(futures); } - waitForCompletion(futures); - futures = new ArrayList<Future<?>>(tool.numLogs); - for (int i = 0; i < tool.numLogs; i++) { - BulkReader reader = new BulkReader(LOG_NAME_PREFIX + i, client, tool.numRecords); - futures.add(executor.submit(reader)); + + if (tool.read) { + LOG.info("Executing parallel reads"); + futures = new ArrayList<Future<?>>(tool.numLogs); + for (int i = 0; i < tool.numLogs; i++) { + BulkReader reader = new BulkReader(getLogName(i), client, tool.numRecords, tool.logFrequency); + futures.add(executor.submit(reader)); + } + waitForCompletion(futures); } - waitForCompletion(futures); executor.shutdownNow(); } + private static LogName getLogName(int id) { + return LogName.of(LOG_NAME_PREFIX + id); + } + private static void waitForCompletion(List<Future<?>> futures) { for (Future<?> future : futures) { try { @@ -85,56 +129,61 @@ public class VerificationTool { LOG.error("Got exception ", e); System.exit(-1); } - } + LOG.info("All operations finished"); } - static class BulkWriter implements Runnable { - private String logName; - private LogServiceClient logServiceClient; - private int numRecords; + static abstract class Operation implements Runnable { + final LogName logName; + final LogServiceClient client; + final int numRecords; + final int logFreq; + + Operation(LogName logName, LogServiceClient client, int numRecords, int logFreq) { + this.logName = logName; + this.client = client; + this.numRecords = numRecords; + this.logFreq = logFreq; + } + } - BulkWriter(String logName, LogServiceClient logServiceClient, int numRecords) { - this.logName = logName; - this.logServiceClient = logServiceClient; - this.numRecords = numRecords; + static class BulkWriter extends Operation { + BulkWriter(LogName logName, LogServiceClient client, int numRecords, int logFreq) { + super(logName, client, numRecords, logFreq); } public void run() { try { - LogStream logStream = this.logServiceClient.createLog(LogName.of(logName)); + LOG.info("Creating {}", logName); + LogStream logStream = this.client.createLog(logName); LogWriter writer = logStream.createWriter(); for (int i = 0; i < this.numRecords; i++) { String message = MESSAGE_PREFIX + i; - System.out.println(logName + " Writing " + message); + if (i % logFreq == 0) { + LOG.info(logName + " Writing " + message); + } writer.write(ByteBuffer.wrap(message.getBytes(StandardCharsets.UTF_8))); } writer.close(); - Log.info("" + numRecords + "log entries written to log "+ this.logName + " successfully."); + LOG.info("{} log entries written to log {} successfully.", numRecords, logName); } catch (IOException e) { throw new RuntimeException(e); } } } - static class BulkReader implements Runnable { - private String logName; - private LogServiceClient logServiceClient; - private int numRecords; - - BulkReader(String logName, LogServiceClient logServiceClient, int numRecords) { - this.logName = logName; - this.logServiceClient = logServiceClient; - this.numRecords = numRecords; - } + static class BulkReader extends Operation { + BulkReader(LogName logName, LogServiceClient client, int numRecords, int logFreq) { + super(logName, client, numRecords, logFreq); + } public void run() { try { - LogStream logStream = this.logServiceClient.getLog(LogName.of(logName)); + LogStream logStream = this.client.getLog(logName); LogReader reader = logStream.createReader(); long size = logStream.getLength(); if(size != this.numRecords) { - Log.error("There is mismatch is number of records. Expected Records: "+ + LOG.error("There is mismatch is number of records. Expected Records: "+ this.numRecords +", Actual Records: " + size); System.exit(-1); } @@ -142,14 +191,16 @@ public class VerificationTool { ByteBuffer buffer = reader.readNext(); String message = new String(buffer.array(), buffer.arrayOffset(), buffer.remaining(), StandardCharsets.UTF_8); - System.out.println(logName + " Read " + message); + if (i % logFreq == 0) { + LOG.info(logName + " Read " + message); + } if(!message.equals(MESSAGE_PREFIX + i)) { - Log.error("Message is not correct. Expected: "+(MESSAGE_PREFIX + i) + LOG.error("Message is not correct. Expected: "+(MESSAGE_PREFIX + i) +". Actual:" +message); System.exit(-1); } } - Log.info("" + numRecords + " log entries read from log "+ this.logName + " successfully."); + LOG.info("{} log entries read from log {} successfully.", numRecords, logName); reader.close(); } catch (IOException e) { throw new RuntimeException(e);