Repository: incubator-distributedlog Updated Branches: refs/heads/master dd8ee62ae -> 0ed872353
DL-114: Add namespace watch tool Port simple namespace watch tool from downstream Author: Leigh Stewart <agrodel...@gmail.com> Author: Leigh Stewart <lstew...@twitter.com> Reviewers: Sijie Guo <si...@apache.org> Closes #80 from leighst/lstewart/dlog/watch_tool Project: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/commit/0ed87235 Tree: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/tree/0ed87235 Diff: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/diff/0ed87235 Branch: refs/heads/master Commit: 0ed872353f879ee2f40957575c57183960545396 Parents: dd8ee62 Author: Leigh Stewart <agrodel...@gmail.com> Authored: Wed Dec 28 22:46:48 2016 -0800 Committer: Sijie Guo <sij...@twitter.com> Committed: Wed Dec 28 22:46:48 2016 -0800 ---------------------------------------------------------------------- .../tools/DistributedLogTool.java | 52 ++++++++++++++++++++ 1 file changed, 52 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/0ed87235/distributedlog-core/src/main/java/com/twitter/distributedlog/tools/DistributedLogTool.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/tools/DistributedLogTool.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/tools/DistributedLogTool.java index e710337..0862d54 100644 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/tools/DistributedLogTool.java +++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/tools/DistributedLogTool.java @@ -55,6 +55,7 @@ import java.util.regex.Pattern; import com.twitter.distributedlog.BKDistributedLogNamespace; import com.twitter.distributedlog.Entry; +import com.twitter.distributedlog.callback.NamespaceListener; import com.twitter.distributedlog.logsegment.LogSegmentMetadataStore; import com.twitter.distributedlog.namespace.DistributedLogNamespace; import com.twitter.distributedlog.util.Utils; @@ -489,6 +490,56 @@ public class DistributedLogTool extends Tool { } } + public static class WatchNamespaceCommand extends PerDLCommand implements NamespaceListener { + private Set<String> currentSet = Sets.<String>newHashSet(); + private CountDownLatch doneLatch = new CountDownLatch(1); + + WatchNamespaceCommand() { + super("watch", "watch and report changes for a dl namespace"); + } + + @Override + protected void parseCommandLine(CommandLine cmdline) throws ParseException { + super.parseCommandLine(cmdline); + } + + @Override + protected String getUsage() { + return "watch [options]"; + } + + @Override + protected int runCmd() throws Exception { + watchAndReportChanges(getNamespace()); + doneLatch.await(); + return 0; + } + + @Override + public synchronized void onStreamsChanged(Iterator<String> streams) { + Set<String> updatedSet = Sets.newHashSet(streams); + Set<String> oldStreams = Sets.difference(currentSet, updatedSet); + Set<String> newStreams = Sets.difference(updatedSet, currentSet); + currentSet = updatedSet; + + System.out.println("Old streams : "); + for (String stream : oldStreams) { + System.out.println(stream); + } + + System.out.println("New streams : "); + for (String stream : newStreams) { + System.out.println(stream); + } + + System.out.println(""); + } + + protected void watchAndReportChanges(DistributedLogNamespace namespace) throws Exception { + namespace.registerNamespaceListener(this); + } + } + protected static class InspectCommand extends PerDLCommand { int numThreads = 1; @@ -2696,6 +2747,7 @@ public class DistributedLogTool extends Tool { addCommand(new TruncateStreamCommand()); addCommand(new DeserializeDLSNCommand()); addCommand(new SerializeDLSNCommand()); + addCommand(new WatchNamespaceCommand()); } @Override