DL-105: Make compression stats available per stream

Project: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/commit/a9cbb2c8
Tree: 
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/tree/a9cbb2c8
Diff: 
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/diff/a9cbb2c8

Branch: refs/heads/master
Commit: a9cbb2c844af8985ed249f423eccd1ebeaafb476
Parents: 316fd94
Author: Yiming Zang <yz...@twitter.com>
Authored: Wed Aug 10 16:18:53 2016 -0700
Committer: Sijie Guo <sij...@twitter.com>
Committed: Tue Dec 27 16:49:27 2016 -0800

----------------------------------------------------------------------
 .../BKDistributedLogNamespace.java              | 20 ++++++++++++++------
 .../namespace/DistributedLogNamespace.java      |  5 ++++-
 .../distributedlog/TestAsyncReaderWriter.java   |  4 +++-
 .../service/stream/StreamImpl.java              |  3 ++-
 4 files changed, 23 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/a9cbb2c8/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogNamespace.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogNamespace.java
 
b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogNamespace.java
index 0f2c222..281c637 100644
--- 
a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogNamespace.java
+++ 
b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogNamespace.java
@@ -512,7 +512,8 @@ public class BKDistributedLogNamespace implements 
DistributedLogNamespace {
                 logName,
                 ClientSharingOption.SharedClients,
                 Optional.<DistributedLogConfiguration>absent(),
-                Optional.<DynamicDistributedLogConfiguration>absent());
+                Optional.<DynamicDistributedLogConfiguration>absent(),
+                Optional.<StatsLogger>absent());
         dlm.delete();
     }
 
@@ -521,13 +522,15 @@ public class BKDistributedLogNamespace implements 
DistributedLogNamespace {
             throws InvalidStreamNameException, IOException {
         return openLog(logName,
                 Optional.<DistributedLogConfiguration>absent(),
-                Optional.<DynamicDistributedLogConfiguration>absent());
+                Optional.<DynamicDistributedLogConfiguration>absent(),
+                Optional.<StatsLogger>absent());
     }
 
     @Override
     public DistributedLogManager openLog(String logName,
                                          Optional<DistributedLogConfiguration> 
logConf,
-                                         
Optional<DynamicDistributedLogConfiguration> dynamicLogConf)
+                                         
Optional<DynamicDistributedLogConfiguration> dynamicLogConf,
+                                         Optional<StatsLogger> 
perStreamStatsLogger)
             throws InvalidStreamNameException, IOException {
         validateName(logName);
         Optional<URI> uri = 
FutureUtils.result(metadataStore.getLogLocation(logName));
@@ -539,7 +542,8 @@ public class BKDistributedLogNamespace implements 
DistributedLogNamespace {
                 logName,
                 ClientSharingOption.SharedClients,
                 logConf,
-                dynamicLogConf);
+                dynamicLogConf,
+                perStreamStatsLogger);
     }
 
     @Override
@@ -780,7 +784,8 @@ public class BKDistributedLogNamespace implements 
DistributedLogNamespace {
                 nameOfLogStream,
                 clientSharingOption,
                 logConfiguration,
-                dynamicLogConfiguration
+                dynamicLogConfiguration,
+                Optional.<StatsLogger>absent()
         );
     }
 
@@ -806,7 +811,8 @@ public class BKDistributedLogNamespace implements 
DistributedLogNamespace {
             String nameOfLogStream,
             ClientSharingOption clientSharingOption,
             Optional<DistributedLogConfiguration> logConfiguration,
-            Optional<DynamicDistributedLogConfiguration> 
dynamicLogConfiguration)
+            Optional<DynamicDistributedLogConfiguration> 
dynamicLogConfiguration,
+            Optional<StatsLogger> perStreamStatsLogger)
         throws InvalidStreamNameException, IOException {
         // Make sure the name is well formed
         validateName(nameOfLogStream);
@@ -872,6 +878,8 @@ public class BKDistributedLogNamespace implements 
DistributedLogNamespace {
             dlmLedgerAlloctor = this.allocator;
             dlmLogSegmentRollingPermitManager = 
this.logSegmentRollingPermitManager;
         }
+        // if there's a specified perStreamStatsLogger, user it, otherwise use 
the default one.
+        StatsLogger perLogStatsLogger = 
perStreamStatsLogger.or(this.perLogStatsLogger);
 
         return new BKDistributedLogManager(
                 nameOfLogStream,                    /* Log Name */

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/a9cbb2c8/distributedlog-core/src/main/java/com/twitter/distributedlog/namespace/DistributedLogNamespace.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-core/src/main/java/com/twitter/distributedlog/namespace/DistributedLogNamespace.java
 
b/distributedlog-core/src/main/java/com/twitter/distributedlog/namespace/DistributedLogNamespace.java
index d42b5f2..b5abe9f 100644
--- 
a/distributedlog-core/src/main/java/com/twitter/distributedlog/namespace/DistributedLogNamespace.java
+++ 
b/distributedlog-core/src/main/java/com/twitter/distributedlog/namespace/DistributedLogNamespace.java
@@ -30,6 +30,8 @@ import 
com.twitter.distributedlog.exceptions.InvalidStreamNameException;
 import java.io.IOException;
 import java.util.Iterator;
 
+import org.apache.bookkeeper.stats.StatsLogger;
+
 /**
  * A namespace is the basic unit for managing a set of distributedlogs.
  *
@@ -128,7 +130,8 @@ public interface DistributedLogNamespace {
      */
     DistributedLogManager openLog(String logName,
                                   Optional<DistributedLogConfiguration> 
logConf,
-                                  Optional<DynamicDistributedLogConfiguration> 
dynamicLogConf)
+                                  Optional<DynamicDistributedLogConfiguration> 
dynamicLogConf,
+                                  Optional<StatsLogger> perStreamStatsLogger)
             throws InvalidStreamNameException, IOException;
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/a9cbb2c8/distributedlog-core/src/test/java/com/twitter/distributedlog/TestAsyncReaderWriter.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestAsyncReaderWriter.java
 
b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestAsyncReaderWriter.java
index 0c7f346..e5063cc 100644
--- 
a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestAsyncReaderWriter.java
+++ 
b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestAsyncReaderWriter.java
@@ -45,6 +45,7 @@ import org.apache.bookkeeper.client.LedgerHandle;
 import org.apache.bookkeeper.client.LedgerMetadata;
 import org.apache.bookkeeper.feature.FixedValueFeature;
 import org.apache.bookkeeper.stats.NullStatsLogger;
+import org.apache.bookkeeper.stats.StatsLogger;
 import org.junit.Ignore;
 import org.junit.Rule;
 import org.junit.Test;
@@ -1979,7 +1980,8 @@ public class TestAsyncReaderWriter extends 
TestDistributedLogBase {
         dlm = namespace.openLog(
                 name + "-custom",
                 Optional.<DistributedLogConfiguration>absent(),
-                Optional.of(dynConf));
+                Optional.of(dynConf),
+                Optional.<StatsLogger>absent());
         writer = dlm.startAsyncLogSegmentNonPartitioned();
         FutureUtils.result(writer.write(DLMTestUtil.getLogRecordInstance(1L)));
         segments = dlm.getLogSegments();

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/a9cbb2c8/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamImpl.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamImpl.java
 
b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamImpl.java
index 3d5b9e7..e74ebbe 100644
--- 
a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamImpl.java
+++ 
b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamImpl.java
@@ -231,7 +231,8 @@ public class StreamImpl implements Stream {
     private DistributedLogManager openLog(String name) throws IOException {
         Optional<DistributedLogConfiguration> dlConf = 
Optional.<DistributedLogConfiguration>absent();
         Optional<DynamicDistributedLogConfiguration> dynDlConf = 
Optional.of(dynConf);
-        return dlNamespace.openLog(name, dlConf, dynDlConf);
+        Optional<StatsLogger> perStreamStatsLogger = Optional.of(streamLogger);
+        return dlNamespace.openLog(name, dlConf, dynDlConf, 
perStreamStatsLogger);
     }
 
     // Expensive initialization, only called once per stream.

Reply via email to