DL-105: Make compression stats available per stream
Project: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/commit/a9cbb2c8 Tree: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/tree/a9cbb2c8 Diff: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/diff/a9cbb2c8 Branch: refs/heads/master Commit: a9cbb2c844af8985ed249f423eccd1ebeaafb476 Parents: 316fd94 Author: Yiming Zang <yz...@twitter.com> Authored: Wed Aug 10 16:18:53 2016 -0700 Committer: Sijie Guo <sij...@twitter.com> Committed: Tue Dec 27 16:49:27 2016 -0800 ---------------------------------------------------------------------- .../BKDistributedLogNamespace.java | 20 ++++++++++++++------ .../namespace/DistributedLogNamespace.java | 5 ++++- .../distributedlog/TestAsyncReaderWriter.java | 4 +++- .../service/stream/StreamImpl.java | 3 ++- 4 files changed, 23 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/a9cbb2c8/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogNamespace.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogNamespace.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogNamespace.java index 0f2c222..281c637 100644 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogNamespace.java +++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogNamespace.java @@ -512,7 +512,8 @@ public class BKDistributedLogNamespace implements DistributedLogNamespace { logName, ClientSharingOption.SharedClients, Optional.<DistributedLogConfiguration>absent(), - Optional.<DynamicDistributedLogConfiguration>absent()); + Optional.<DynamicDistributedLogConfiguration>absent(), + Optional.<StatsLogger>absent()); dlm.delete(); } @@ -521,13 +522,15 @@ public class BKDistributedLogNamespace implements DistributedLogNamespace { throws InvalidStreamNameException, IOException { return openLog(logName, Optional.<DistributedLogConfiguration>absent(), - Optional.<DynamicDistributedLogConfiguration>absent()); + Optional.<DynamicDistributedLogConfiguration>absent(), + Optional.<StatsLogger>absent()); } @Override public DistributedLogManager openLog(String logName, Optional<DistributedLogConfiguration> logConf, - Optional<DynamicDistributedLogConfiguration> dynamicLogConf) + Optional<DynamicDistributedLogConfiguration> dynamicLogConf, + Optional<StatsLogger> perStreamStatsLogger) throws InvalidStreamNameException, IOException { validateName(logName); Optional<URI> uri = FutureUtils.result(metadataStore.getLogLocation(logName)); @@ -539,7 +542,8 @@ public class BKDistributedLogNamespace implements DistributedLogNamespace { logName, ClientSharingOption.SharedClients, logConf, - dynamicLogConf); + dynamicLogConf, + perStreamStatsLogger); } @Override @@ -780,7 +784,8 @@ public class BKDistributedLogNamespace implements DistributedLogNamespace { nameOfLogStream, clientSharingOption, logConfiguration, - dynamicLogConfiguration + dynamicLogConfiguration, + Optional.<StatsLogger>absent() ); } @@ -806,7 +811,8 @@ public class BKDistributedLogNamespace implements DistributedLogNamespace { String nameOfLogStream, ClientSharingOption clientSharingOption, Optional<DistributedLogConfiguration> logConfiguration, - Optional<DynamicDistributedLogConfiguration> dynamicLogConfiguration) + Optional<DynamicDistributedLogConfiguration> dynamicLogConfiguration, + Optional<StatsLogger> perStreamStatsLogger) throws InvalidStreamNameException, IOException { // Make sure the name is well formed validateName(nameOfLogStream); @@ -872,6 +878,8 @@ public class BKDistributedLogNamespace implements DistributedLogNamespace { dlmLedgerAlloctor = this.allocator; dlmLogSegmentRollingPermitManager = this.logSegmentRollingPermitManager; } + // if there's a specified perStreamStatsLogger, user it, otherwise use the default one. + StatsLogger perLogStatsLogger = perStreamStatsLogger.or(this.perLogStatsLogger); return new BKDistributedLogManager( nameOfLogStream, /* Log Name */ http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/a9cbb2c8/distributedlog-core/src/main/java/com/twitter/distributedlog/namespace/DistributedLogNamespace.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/namespace/DistributedLogNamespace.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/namespace/DistributedLogNamespace.java index d42b5f2..b5abe9f 100644 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/namespace/DistributedLogNamespace.java +++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/namespace/DistributedLogNamespace.java @@ -30,6 +30,8 @@ import com.twitter.distributedlog.exceptions.InvalidStreamNameException; import java.io.IOException; import java.util.Iterator; +import org.apache.bookkeeper.stats.StatsLogger; + /** * A namespace is the basic unit for managing a set of distributedlogs. * @@ -128,7 +130,8 @@ public interface DistributedLogNamespace { */ DistributedLogManager openLog(String logName, Optional<DistributedLogConfiguration> logConf, - Optional<DynamicDistributedLogConfiguration> dynamicLogConf) + Optional<DynamicDistributedLogConfiguration> dynamicLogConf, + Optional<StatsLogger> perStreamStatsLogger) throws InvalidStreamNameException, IOException; /** http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/a9cbb2c8/distributedlog-core/src/test/java/com/twitter/distributedlog/TestAsyncReaderWriter.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestAsyncReaderWriter.java b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestAsyncReaderWriter.java index 0c7f346..e5063cc 100644 --- a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestAsyncReaderWriter.java +++ b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestAsyncReaderWriter.java @@ -45,6 +45,7 @@ import org.apache.bookkeeper.client.LedgerHandle; import org.apache.bookkeeper.client.LedgerMetadata; import org.apache.bookkeeper.feature.FixedValueFeature; import org.apache.bookkeeper.stats.NullStatsLogger; +import org.apache.bookkeeper.stats.StatsLogger; import org.junit.Ignore; import org.junit.Rule; import org.junit.Test; @@ -1979,7 +1980,8 @@ public class TestAsyncReaderWriter extends TestDistributedLogBase { dlm = namespace.openLog( name + "-custom", Optional.<DistributedLogConfiguration>absent(), - Optional.of(dynConf)); + Optional.of(dynConf), + Optional.<StatsLogger>absent()); writer = dlm.startAsyncLogSegmentNonPartitioned(); FutureUtils.result(writer.write(DLMTestUtil.getLogRecordInstance(1L))); segments = dlm.getLogSegments(); http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/a9cbb2c8/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamImpl.java ---------------------------------------------------------------------- diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamImpl.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamImpl.java index 3d5b9e7..e74ebbe 100644 --- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamImpl.java +++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamImpl.java @@ -231,7 +231,8 @@ public class StreamImpl implements Stream { private DistributedLogManager openLog(String name) throws IOException { Optional<DistributedLogConfiguration> dlConf = Optional.<DistributedLogConfiguration>absent(); Optional<DynamicDistributedLogConfiguration> dynDlConf = Optional.of(dynConf); - return dlNamespace.openLog(name, dlConf, dynDlConf); + Optional<StatsLogger> perStreamStatsLogger = Optional.of(streamLogger); + return dlNamespace.openLog(name, dlConf, dynDlConf, perStreamStatsLogger); } // Expensive initialization, only called once per stream.