This is an automated email from the ASF dual-hosted git repository.

ivank pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new dd684b3  Ledger manager factories initialized with max metadata version
dd684b3 is described below

commit dd684b30c734642dc1347cd931cb483de2ef4888
Author: Ivan Kelly <[email protected]>
AuthorDate: Thu Dec 6 21:02:41 2018 +0100

    Ledger manager factories initialized with max metadata version
    
    In dce4fd4 the max metadata format version was put in the
    configuration which was then used to create ledger managers. This is
    not safe though, as it there are multiple paths which through which
    ledger manager factories are created and some may not see this
    configuration modification. For example, on a new cluster where there
    is no preexisting layout, the layout isn't written until after the
    ledger manager factory has been created.
    
    This patch changes LedgerManagerFactory#initialize to explicitly
    require that the max ledger metadata format is specified in the call.
    
    Master issue: #723
    
    Reviewers: Enrico Olivelli <[email protected]>
    
    This closes #1865 from ivankelly/explicit-max
---
 .../bookkeeper/conf/AbstractConfiguration.java     | 24 ----------------------
 .../meta/AbstractHierarchicalLedgerManager.java    |  5 +++--
 .../bookkeeper/meta/AbstractZkLedgerManager.java   |  5 +++--
 .../meta/AbstractZkLedgerManagerFactory.java       | 16 ++++++++-------
 .../apache/bookkeeper/meta/FlatLedgerManager.java  |  5 +++--
 .../bookkeeper/meta/FlatLedgerManagerFactory.java  |  7 +++++--
 .../bookkeeper/meta/HierarchicalLedgerManager.java |  9 ++++----
 .../meta/HierarchicalLedgerManagerFactory.java     |  2 +-
 .../bookkeeper/meta/LedgerManagerFactory.java      |  5 ++++-
 .../meta/LegacyHierarchicalLedgerManager.java      |  5 +++--
 .../LegacyHierarchicalLedgerManagerFactory.java    |  7 +++++--
 .../meta/LongHierarchicalLedgerManager.java        |  5 +++--
 .../meta/LongHierarchicalLedgerManagerFactory.java |  2 +-
 .../bookkeeper/meta/MSLedgerManagerFactory.java    | 12 +++++++----
 .../client/ParallelLedgerRecoveryTest.java         |  7 +++++--
 .../meta/AbstractZkLedgerManagerTest.java          | 12 ++++++-----
 .../metadata/etcd/EtcdLedgerManager.java           |  9 ++++----
 .../metadata/etcd/EtcdLedgerManagerFactory.java    |  9 ++++----
 .../metadata/etcd/EtcdMetadataDriverBase.java      |  4 +++-
 .../metadata/etcd/EtcdLedgerManagerTest.java       |  5 +++--
 .../shaded/DistributedLogCoreShadedJarTest.java    |  4 ++--
 21 files changed, 82 insertions(+), 77 deletions(-)

diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/AbstractConfiguration.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/AbstractConfiguration.java
index 3057bfe..7f00d09 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/AbstractConfiguration.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/AbstractConfiguration.java
@@ -85,7 +85,6 @@ public abstract class AbstractConfiguration<T extends 
AbstractConfiguration>
     protected static final String 
STORE_SYSTEMTIME_AS_LEDGER_UNDERREPLICATED_MARK_TIME =
             "storeSystemTimeAsLedgerUnderreplicatedMarkTime";
     protected static final String STORE_SYSTEMTIME_AS_LEDGER_CREATION_TIME = 
"storeSystemTimeAsLedgerCreationTime";
-    protected static final String MAX_LEDGER_METADATA_FORMAT_VERSION = 
"maxLedgerMetadataFormatVersion";
 
     // Metastore settings, only being used when LEDGER_MANAGER_FACTORY_CLASS 
is MSLedgerManagerFactory
     protected static final String METASTORE_IMPL_CLASS = "metastoreImplClass";
@@ -857,29 +856,6 @@ public abstract class AbstractConfiguration<T extends 
AbstractConfiguration>
     }
 
     /**
-     * Set the maximum format version which should be used when serializing 
ledger metadata.
-     * Setting the maximum format allows us to ensure that all clients running 
against a cluster
-     * will be able to read back all written metadata.
-     * Normally this is set when loading the layout from the metadata store. 
Users should not
-     * set this directly.
-     */
-    public T setMaxLedgerMetadataFormatVersion(int maxVersion) {
-        setProperty(MAX_LEDGER_METADATA_FORMAT_VERSION, maxVersion);
-        return getThis();
-    }
-
-    /**
-     * Get the maximum format version which should be used when serializing 
ledger metadata.
-     * The default is 2, as that was the current version when this 
functionallity was introduced.
-     *
-     * @see #setMaxLedgerMetadataFormatVersion(int)
-     * @return the maximum format version with which to serialize metadata.
-     */
-    public int getMaxLedgerMetadataFormatVersion() {
-        return getInteger(MAX_LEDGER_METADATA_FORMAT_VERSION, 2);
-    }
-
-    /**
      * Whether to preserve MDC for tasks in Executor.
      *
      * @return flag to enable/disable MDC preservation in Executor.
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/AbstractHierarchicalLedgerManager.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/AbstractHierarchicalLedgerManager.java
index 1aa8e15..1a0ce2e 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/AbstractHierarchicalLedgerManager.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/AbstractHierarchicalLedgerManager.java
@@ -49,8 +49,9 @@ public abstract class AbstractHierarchicalLedgerManager 
extends AbstractZkLedger
      * @param zk
      *          ZooKeeper Client Handle
      */
-    public AbstractHierarchicalLedgerManager(AbstractConfiguration conf, 
ZooKeeper zk) {
-        super(conf, zk);
+    public AbstractHierarchicalLedgerManager(AbstractConfiguration conf, 
ZooKeeper zk,
+                                             int 
maxLedgerMetadataFormatVersion) {
+        super(conf, zk, maxLedgerMetadataFormatVersion);
     }
 
     /**
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/AbstractZkLedgerManager.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/AbstractZkLedgerManager.java
index 97413b7..1c90154 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/AbstractZkLedgerManager.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/AbstractZkLedgerManager.java
@@ -159,8 +159,9 @@ public abstract class AbstractZkLedgerManager implements 
LedgerManager, Watcher
      * @param zk
      *          ZooKeeper Client Handle
      */
-    protected AbstractZkLedgerManager(AbstractConfiguration conf, ZooKeeper 
zk) {
-        this.serDe = new 
LedgerMetadataSerDe(conf.getMaxLedgerMetadataFormatVersion());
+    protected AbstractZkLedgerManager(AbstractConfiguration conf, ZooKeeper zk,
+                                      int maxLedgerMetadataFormatVersion) {
+        this.serDe = new LedgerMetadataSerDe(maxLedgerMetadataFormatVersion);
         this.conf = conf;
         this.zk = zk;
         this.ledgerRootPath = 
ZKMetadataDriverBase.resolveZkLedgersRootPath(conf);
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/AbstractZkLedgerManagerFactory.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/AbstractZkLedgerManagerFactory.java
index 4bb2fb1..be95a53 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/AbstractZkLedgerManagerFactory.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/AbstractZkLedgerManagerFactory.java
@@ -160,8 +160,8 @@ public abstract class AbstractZkLedgerManagerFactory 
implements LedgerManagerFac
 
         // if layoutManager is null, return the default ledger manager
         if (layoutManager == null) {
-            return new FlatLedgerManagerFactory()
-                   .initialize(conf, null, 
FlatLedgerManagerFactory.CUR_VERSION);
+            return new FlatLedgerManagerFactory().initialize(conf, null,
+                    FlatLedgerManagerFactory.CUR_VERSION, 
LedgerMetadataSerDe.CURRENT_METADATA_FORMAT_VERSION);
         }
 
         LedgerManagerFactory lmFactory;
@@ -172,13 +172,13 @@ public abstract class AbstractZkLedgerManagerFactory 
implements LedgerManagerFac
 
         if (layout == null) { // no existing layout
             lmFactory = createNewLMFactory(conf, layoutManager, factoryClass);
-            return lmFactory
-                    .initialize(conf, layoutManager, 
lmFactory.getCurrentVersion());
+            return lmFactory.initialize(conf, layoutManager,
+                                        lmFactory.getCurrentVersion(),
+                                        
LedgerMetadataSerDe.CURRENT_METADATA_FORMAT_VERSION);
         }
         if (log.isDebugEnabled()) {
             log.debug("read ledger layout {}", layout);
         }
-        
conf.setMaxLedgerMetadataFormatVersion(layout.getMaxLedgerMetadataFormatVersion());
 
         // there is existing layout, we need to look into the layout.
         // handle pre V2 layout
@@ -198,7 +198,8 @@ public abstract class AbstractZkLedgerManagerFactory 
implements LedgerManagerFac
             } else {
                 throw new IOException("Unknown ledger manager type: " + 
lmType);
             }
-            return lmFactory.initialize(conf, layoutManager, 
layout.getManagerVersion());
+            return lmFactory.initialize(conf, layoutManager, 
layout.getManagerVersion(),
+                                        
layout.getMaxLedgerMetadataFormatVersion());
         }
 
         // handle V2 layout case
@@ -228,7 +229,8 @@ public abstract class AbstractZkLedgerManagerFactory 
implements LedgerManagerFac
         }
         // instantiate a factory
         lmFactory = ReflectionUtils.newInstance(factoryClass);
-        return lmFactory.initialize(conf, layoutManager, 
layout.getManagerVersion());
+        return lmFactory.initialize(conf, layoutManager, 
layout.getManagerVersion(),
+                                    
layout.getMaxLedgerMetadataFormatVersion());
     }
 
     private static String normalizedLedgerManagerFactoryClassName(String 
factoryClass,
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/FlatLedgerManager.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/FlatLedgerManager.java
index 7ee2e22..115ffe3 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/FlatLedgerManager.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/FlatLedgerManager.java
@@ -55,8 +55,9 @@ class FlatLedgerManager extends AbstractZkLedgerManager {
      *          ZooKeeper Client Handle
      * @throws IOException when version is not compatible
      */
-    public FlatLedgerManager(AbstractConfiguration conf, ZooKeeper zk) {
-        super(conf, zk);
+    public FlatLedgerManager(AbstractConfiguration conf, ZooKeeper zk,
+                             int maxLedgerMetadataFormatVersion) {
+        super(conf, zk, maxLedgerMetadataFormatVersion);
 
         ledgerPrefix = ledgerRootPath + "/" + StringUtils.LEDGER_NODE_PREFIX;
     }
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/FlatLedgerManagerFactory.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/FlatLedgerManagerFactory.java
index 19ac418..5c92ba1 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/FlatLedgerManagerFactory.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/FlatLedgerManagerFactory.java
@@ -41,6 +41,7 @@ public class FlatLedgerManagerFactory extends 
AbstractZkLedgerManagerFactory {
     public static final int CUR_VERSION = 1;
 
     AbstractConfiguration conf;
+    private int maxLedgerMetadataFormatVersion;
 
     @Override
     public int getCurrentVersion() {
@@ -50,7 +51,8 @@ public class FlatLedgerManagerFactory extends 
AbstractZkLedgerManagerFactory {
     @Override
     public LedgerManagerFactory initialize(final AbstractConfiguration conf,
                                            final LayoutManager layoutManager,
-                                           final int factoryVersion)
+                                           final int factoryVersion,
+                                           int maxLedgerMetadataFormatVersion)
     throws IOException {
         checkArgument(layoutManager == null || layoutManager instanceof 
ZkLayoutManager);
 
@@ -61,6 +63,7 @@ public class FlatLedgerManagerFactory extends 
AbstractZkLedgerManagerFactory {
         this.conf = conf;
 
         this.zk = layoutManager == null ? null : ((ZkLayoutManager) 
layoutManager).getZk();
+        this.maxLedgerMetadataFormatVersion = maxLedgerMetadataFormatVersion;
         return this;
     }
 
@@ -79,7 +82,7 @@ public class FlatLedgerManagerFactory extends 
AbstractZkLedgerManagerFactory {
 
     @Override
     public LedgerManager newLedgerManager() {
-        return new FlatLedgerManager(conf, zk);
+        return new FlatLedgerManager(conf, zk, maxLedgerMetadataFormatVersion);
     }
 
     @Override
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/HierarchicalLedgerManager.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/HierarchicalLedgerManager.java
index 946ed2a..c4f211a 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/HierarchicalLedgerManager.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/HierarchicalLedgerManager.java
@@ -46,10 +46,11 @@ class HierarchicalLedgerManager extends 
AbstractHierarchicalLedgerManager {
     LegacyHierarchicalLedgerManager legacyLM;
     LongHierarchicalLedgerManager longLM;
 
-    public HierarchicalLedgerManager(AbstractConfiguration conf, ZooKeeper zk) 
{
-        super(conf, zk);
-        legacyLM = new LegacyHierarchicalLedgerManager(conf, zk);
-        longLM = new LongHierarchicalLedgerManager (conf, zk);
+    public HierarchicalLedgerManager(AbstractConfiguration conf, ZooKeeper zk,
+                                     int maxLedgerMetadataFormatVersion) {
+        super(conf, zk, maxLedgerMetadataFormatVersion);
+        legacyLM = new LegacyHierarchicalLedgerManager(conf, zk, 
maxLedgerMetadataFormatVersion);
+        longLM = new LongHierarchicalLedgerManager (conf, zk, 
maxLedgerMetadataFormatVersion);
     }
 
     @Override
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/HierarchicalLedgerManagerFactory.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/HierarchicalLedgerManagerFactory.java
index 4a9d6cf..c4f0f4f 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/HierarchicalLedgerManagerFactory.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/HierarchicalLedgerManagerFactory.java
@@ -42,6 +42,6 @@ public class HierarchicalLedgerManagerFactory extends 
LegacyHierarchicalLedgerMa
 
     @Override
     public LedgerManager newLedgerManager() {
-        return new HierarchicalLedgerManager(conf, zk);
+        return new HierarchicalLedgerManager(conf, zk, 
maxLedgerMetadataFormatVersion);
     }
 }
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerManagerFactory.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerManagerFactory.java
index 80d3a65..eb4e0f4 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerManagerFactory.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerManagerFactory.java
@@ -43,12 +43,15 @@ public interface LedgerManagerFactory extends AutoCloseable 
{
      *          Layout manager used for initialize ledger manager factory
      * @param factoryVersion
      *          What version used to initialize factory.
+     * @param maxLedgerMetadataFormatVersion
+     *          Maximum format version for ledger metadata.
      * @return ledger manager factory instance
      * @throws IOException when fail to initialize the factory.
      */
     LedgerManagerFactory initialize(AbstractConfiguration conf,
                                     LayoutManager layoutManager,
-                                    int factoryVersion)
+                                    int factoryVersion,
+                                    int maxLedgerMetadataFormatVersion)
         throws IOException;
 
     /**
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LegacyHierarchicalLedgerManager.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LegacyHierarchicalLedgerManager.java
index 76ecc9d..b5065a8 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LegacyHierarchicalLedgerManager.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LegacyHierarchicalLedgerManager.java
@@ -69,8 +69,9 @@ class LegacyHierarchicalLedgerManager extends 
AbstractHierarchicalLedgerManager
      * @param zk
      *          ZooKeeper Client Handle
      */
-    public LegacyHierarchicalLedgerManager(AbstractConfiguration conf, 
ZooKeeper zk) {
-        super(conf, zk);
+    public LegacyHierarchicalLedgerManager(AbstractConfiguration conf, 
ZooKeeper zk,
+                                           int maxLedgerMetadataFormatVersion) 
{
+        super(conf, zk, maxLedgerMetadataFormatVersion);
     }
 
     @Override
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LegacyHierarchicalLedgerManagerFactory.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LegacyHierarchicalLedgerManagerFactory.java
index 9157973..bd5c579 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LegacyHierarchicalLedgerManagerFactory.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LegacyHierarchicalLedgerManagerFactory.java
@@ -38,6 +38,7 @@ public class LegacyHierarchicalLedgerManagerFactory extends 
AbstractZkLedgerMana
     public static final int CUR_VERSION = 1;
 
     AbstractConfiguration conf;
+    int maxLedgerMetadataFormatVersion;
 
     @Override
     public int getCurrentVersion() {
@@ -47,7 +48,8 @@ public class LegacyHierarchicalLedgerManagerFactory extends 
AbstractZkLedgerMana
     @Override
     public LedgerManagerFactory initialize(final AbstractConfiguration conf,
                                            final LayoutManager lm,
-                                           final int factoryVersion)
+                                           final int factoryVersion,
+                                           int maxLedgerMetadataFormatVersion)
             throws IOException {
         checkArgument(lm instanceof ZkLayoutManager);
 
@@ -58,6 +60,7 @@ public class LegacyHierarchicalLedgerManagerFactory extends 
AbstractZkLedgerMana
                                 + factoryVersion);
         }
         this.conf = conf;
+        this.maxLedgerMetadataFormatVersion = maxLedgerMetadataFormatVersion;
         this.zk = zkLayoutManager.getZk();
         return this;
     }
@@ -80,7 +83,7 @@ public class LegacyHierarchicalLedgerManagerFactory extends 
AbstractZkLedgerMana
 
     @Override
     public LedgerManager newLedgerManager() {
-        return new LegacyHierarchicalLedgerManager(conf, zk);
+        return new LegacyHierarchicalLedgerManager(conf, zk, 
maxLedgerMetadataFormatVersion);
     }
 
     @Override
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LongHierarchicalLedgerManager.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LongHierarchicalLedgerManager.java
index 2e69e90..e464ed0 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LongHierarchicalLedgerManager.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LongHierarchicalLedgerManager.java
@@ -70,8 +70,9 @@ class LongHierarchicalLedgerManager extends 
AbstractHierarchicalLedgerManager {
      * @param zk
      *            ZooKeeper Client Handle
      */
-    public LongHierarchicalLedgerManager(AbstractConfiguration conf, ZooKeeper 
zk) {
-        super(conf, zk);
+    public LongHierarchicalLedgerManager(AbstractConfiguration conf, ZooKeeper 
zk,
+                                         int maxLedgerMetadataFormatVersion) {
+        super(conf, zk, maxLedgerMetadataFormatVersion);
     }
 
     @Override
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LongHierarchicalLedgerManagerFactory.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LongHierarchicalLedgerManagerFactory.java
index 93ad9dd..4c06cc4 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LongHierarchicalLedgerManagerFactory.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LongHierarchicalLedgerManagerFactory.java
@@ -26,7 +26,7 @@ public class LongHierarchicalLedgerManagerFactory extends 
HierarchicalLedgerMana
 
     @Override
     public LedgerManager newLedgerManager() {
-        return new LongHierarchicalLedgerManager(conf, zk);
+        return new LongHierarchicalLedgerManager(conf, zk, 
maxLedgerMetadataFormatVersion);
     }
 
 }
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/MSLedgerManagerFactory.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/MSLedgerManagerFactory.java
index 38c6816..e10f274 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/MSLedgerManagerFactory.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/MSLedgerManagerFactory.java
@@ -96,6 +96,7 @@ public class MSLedgerManagerFactory extends 
AbstractZkLedgerManagerFactory {
     public static final String META_FIELD = ".META";
 
     AbstractConfiguration conf;
+    private int maxLedgerMetadataFormatVersion;
     MetaStore metastore;
 
     @Override
@@ -106,7 +107,8 @@ public class MSLedgerManagerFactory extends 
AbstractZkLedgerManagerFactory {
     @Override
     public LedgerManagerFactory initialize(final AbstractConfiguration conf,
                                            final LayoutManager layoutManager,
-                                           final int factoryVersion) throws 
IOException {
+                                           final int factoryVersion,
+                                           int maxLedgerMetadataFormatVersion) 
throws IOException {
         checkArgument(layoutManager instanceof ZkLayoutManager);
         ZkLayoutManager zkLayoutManager = (ZkLayoutManager) layoutManager;
 
@@ -115,6 +117,7 @@ public class MSLedgerManagerFactory extends 
AbstractZkLedgerManagerFactory {
         }
         this.conf = conf;
         this.zk = zkLayoutManager.getZk();
+        this.maxLedgerMetadataFormatVersion = maxLedgerMetadataFormatVersion;
 
         // load metadata store
         String msName = conf.getMetastoreImplClass();
@@ -280,11 +283,12 @@ public class MSLedgerManagerFactory extends 
AbstractZkLedgerManagerFactory {
             }
         }
 
-        MsLedgerManager(final AbstractConfiguration conf, final ZooKeeper zk, 
final MetaStore metastore) {
+        MsLedgerManager(final AbstractConfiguration conf, final ZooKeeper zk, 
final MetaStore metastore,
+                        int maxLedgerMetadataFormatVersion) {
             this.conf = conf;
             this.zk = zk;
             this.metastore = metastore;
-            this.serDe = new 
LedgerMetadataSerDe(conf.getMaxLedgerMetadataFormatVersion());
+            this.serDe = new 
LedgerMetadataSerDe(maxLedgerMetadataFormatVersion);
 
             try {
                 ledgerTable = metastore.createScannableTable(TABLE_NAME);
@@ -651,7 +655,7 @@ public class MSLedgerManagerFactory extends 
AbstractZkLedgerManagerFactory {
 
     @Override
     public LedgerManager newLedgerManager() {
-        return new MsLedgerManager(conf, zk, metastore);
+        return new MsLedgerManager(conf, zk, metastore, 
maxLedgerMetadataFormatVersion);
     }
 
     @Override
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ParallelLedgerRecoveryTest.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ParallelLedgerRecoveryTest.java
index ab966fb..1f9656b 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ParallelLedgerRecoveryTest.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ParallelLedgerRecoveryTest.java
@@ -50,6 +50,7 @@ import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.meta.HierarchicalLedgerManagerFactory;
 import org.apache.bookkeeper.meta.LedgerManager;
 import org.apache.bookkeeper.meta.LedgerManagerFactory;
+import org.apache.bookkeeper.meta.LedgerMetadataSerDe;
 import org.apache.bookkeeper.meta.MetadataDrivers;
 import org.apache.bookkeeper.meta.exceptions.Code;
 import org.apache.bookkeeper.meta.exceptions.MetadataException;
@@ -184,7 +185,8 @@ public class ParallelLedgerRecoveryTest extends 
BookKeeperClusterTestCase {
             if (null == lmFactory) {
                 try {
                     lmFactory = new TestLedgerManagerFactory()
-                        .initialize(conf, layoutManager, 
TestLedgerManagerFactory.CUR_VERSION);
+                        .initialize(conf, layoutManager, 
TestLedgerManagerFactory.CUR_VERSION,
+                                    
LedgerMetadataSerDe.CURRENT_METADATA_FORMAT_VERSION);
                 } catch (IOException e) {
                     throw new MetadataException(Code.METADATA_SERVICE_ERROR, 
e);
                 }
@@ -200,7 +202,8 @@ public class ParallelLedgerRecoveryTest extends 
BookKeeperClusterTestCase {
             if (null == lmFactory) {
                 try {
                     lmFactory = new TestLedgerManagerFactory()
-                        .initialize(conf, layoutManager, 
TestLedgerManagerFactory.CUR_VERSION);
+                        .initialize(conf, layoutManager, 
TestLedgerManagerFactory.CUR_VERSION,
+                                    
LedgerMetadataSerDe.CURRENT_METADATA_FORMAT_VERSION);
                 } catch (IOException e) {
                     throw new MetadataException(Code.METADATA_SERVICE_ERROR, 
e);
                 }
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/AbstractZkLedgerManagerTest.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/AbstractZkLedgerManagerTest.java
index 20a8d51..620c3b2 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/AbstractZkLedgerManagerTest.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/AbstractZkLedgerManagerTest.java
@@ -110,9 +110,9 @@ public class AbstractZkLedgerManagerTest extends 
MockZooKeeperTestCase {
 
         this.conf = new ClientConfiguration();
         this.ledgerManager = mock(
-            AbstractZkLedgerManager.class,
-            withSettings()
-                .useConstructor(conf, mockZk)
+                AbstractZkLedgerManager.class,
+                withSettings()
+                .useConstructor(conf, mockZk, 
LedgerMetadataSerDe.CURRENT_METADATA_FORMAT_VERSION)
                 .defaultAnswer(CALLS_REAL_METHODS));
         List<BookieSocketAddress> ensemble = Lists.newArrayList(
                 new BookieSocketAddress("192.0.2.1", 3181),
@@ -303,13 +303,15 @@ public class AbstractZkLedgerManagerTest extends 
MockZooKeeperTestCase {
 
     @Test
     public void testRemoveLedgerMetadataHierarchical() throws Exception {
-        HierarchicalLedgerManager hlm = new HierarchicalLedgerManager(conf, 
mockZk);
+        HierarchicalLedgerManager hlm = new HierarchicalLedgerManager(conf, 
mockZk,
+                LedgerMetadataSerDe.CURRENT_METADATA_FORMAT_VERSION);
         testRemoveLedgerMetadataHierarchicalLedgerManager(hlm);
     }
 
     @Test
     public void testRemoveLedgerMetadataLongHierarchical() throws Exception {
-        LongHierarchicalLedgerManager hlm = new 
LongHierarchicalLedgerManager(conf, mockZk);
+        LongHierarchicalLedgerManager hlm = new 
LongHierarchicalLedgerManager(conf, mockZk,
+                LedgerMetadataSerDe.CURRENT_METADATA_FORMAT_VERSION);
         testRemoveLedgerMetadataHierarchicalLedgerManager(hlm);
     }
 
diff --git 
a/metadata-drivers/etcd/src/main/java/org/apache/bookkeeper/metadata/etcd/EtcdLedgerManager.java
 
b/metadata-drivers/etcd/src/main/java/org/apache/bookkeeper/metadata/etcd/EtcdLedgerManager.java
index e20a9ba..ffbbc08 100644
--- 
a/metadata-drivers/etcd/src/main/java/org/apache/bookkeeper/metadata/etcd/EtcdLedgerManager.java
+++ 
b/metadata-drivers/etcd/src/main/java/org/apache/bookkeeper/metadata/etcd/EtcdLedgerManager.java
@@ -45,7 +45,6 @@ import java.util.function.Function;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.api.LedgerMetadata;
-import org.apache.bookkeeper.conf.AbstractConfiguration;
 import org.apache.bookkeeper.meta.LedgerManager;
 import org.apache.bookkeeper.meta.LedgerMetadataSerDe;
 import org.apache.bookkeeper.metadata.etcd.helpers.KeyIterator;
@@ -79,14 +78,14 @@ class EtcdLedgerManager implements LedgerManager {
 
     private volatile boolean closed = false;
 
-    EtcdLedgerManager(AbstractConfiguration conf,
-                      Client client,
-                      String scope) {
+    EtcdLedgerManager(Client client,
+                      String scope,
+                      int maxLedgerMetadataFormatVersion) {
         this.client = client;
         this.kvClient = client.getKVClient();
         this.scope = scope;
         this.watchClient = new EtcdWatchClient(client);
-        this.serDe = new 
LedgerMetadataSerDe(conf.getMaxLedgerMetadataFormatVersion());
+        this.serDe = new LedgerMetadataSerDe(maxLedgerMetadataFormatVersion);
 
         this.ledgerMetadataFunction = bs -> {
             try {
diff --git 
a/metadata-drivers/etcd/src/main/java/org/apache/bookkeeper/metadata/etcd/EtcdLedgerManagerFactory.java
 
b/metadata-drivers/etcd/src/main/java/org/apache/bookkeeper/metadata/etcd/EtcdLedgerManagerFactory.java
index 9c19df0..0bc16d8 100644
--- 
a/metadata-drivers/etcd/src/main/java/org/apache/bookkeeper/metadata/etcd/EtcdLedgerManagerFactory.java
+++ 
b/metadata-drivers/etcd/src/main/java/org/apache/bookkeeper/metadata/etcd/EtcdLedgerManagerFactory.java
@@ -42,7 +42,7 @@ class EtcdLedgerManagerFactory implements 
LedgerManagerFactory {
 
     private String scope;
     private Client client;
-    private AbstractConfiguration conf;
+    private int maxLedgerMetadataFormatVersion;
 
     @Override
     public int getCurrentVersion() {
@@ -52,7 +52,8 @@ class EtcdLedgerManagerFactory implements 
LedgerManagerFactory {
     @Override
     public LedgerManagerFactory initialize(AbstractConfiguration conf,
                                            LayoutManager layoutManager,
-                                           int factoryVersion) throws 
IOException {
+                                           int factoryVersion,
+                                           int maxLedgerMetadataFormatVersion) 
throws IOException {
         checkArgument(layoutManager instanceof EtcdLayoutManager);
 
         EtcdLayoutManager etcdLayoutManager = (EtcdLayoutManager) 
layoutManager;
@@ -67,7 +68,7 @@ class EtcdLedgerManagerFactory implements 
LedgerManagerFactory {
             throw new IOException("Invalid metadata service uri", e);
         }
         this.client = etcdLayoutManager.getClient();
-        this.conf = conf;
+        this.maxLedgerMetadataFormatVersion = maxLedgerMetadataFormatVersion;
         return this;
     }
 
@@ -84,7 +85,7 @@ class EtcdLedgerManagerFactory implements 
LedgerManagerFactory {
 
     @Override
     public LedgerManager newLedgerManager() {
-        return new EtcdLedgerManager(conf, client, scope);
+        return new EtcdLedgerManager(client, scope, 
maxLedgerMetadataFormatVersion);
     }
 
     @Override
diff --git 
a/metadata-drivers/etcd/src/main/java/org/apache/bookkeeper/metadata/etcd/EtcdMetadataDriverBase.java
 
b/metadata-drivers/etcd/src/main/java/org/apache/bookkeeper/metadata/etcd/EtcdMetadataDriverBase.java
index 47b034c..cbd6f32 100644
--- 
a/metadata-drivers/etcd/src/main/java/org/apache/bookkeeper/metadata/etcd/EtcdMetadataDriverBase.java
+++ 
b/metadata-drivers/etcd/src/main/java/org/apache/bookkeeper/metadata/etcd/EtcdMetadataDriverBase.java
@@ -27,6 +27,7 @@ import org.apache.bookkeeper.common.net.ServiceURI;
 import org.apache.bookkeeper.conf.AbstractConfiguration;
 import org.apache.bookkeeper.meta.LayoutManager;
 import org.apache.bookkeeper.meta.LedgerManagerFactory;
+import org.apache.bookkeeper.meta.LedgerMetadataSerDe;
 import org.apache.bookkeeper.meta.exceptions.Code;
 import org.apache.bookkeeper.meta.exceptions.MetadataException;
 import org.apache.bookkeeper.stats.StatsLogger;
@@ -106,7 +107,8 @@ class EtcdMetadataDriverBase implements AutoCloseable {
         if (null == lmFactory) {
             try {
                 lmFactory = new EtcdLedgerManagerFactory();
-                lmFactory.initialize(conf, layoutManager, 
EtcdLedgerManagerFactory.VERSION);
+                lmFactory.initialize(conf, layoutManager, 
EtcdLedgerManagerFactory.VERSION,
+                                     
LedgerMetadataSerDe.CURRENT_METADATA_FORMAT_VERSION);
             } catch (IOException ioe) {
                 throw new MetadataException(
                     Code.METADATA_SERVICE_ERROR, "Failed to initialize ledger 
manager factory", ioe);
diff --git 
a/metadata-drivers/etcd/src/test/java/org/apache/bookkeeper/metadata/etcd/EtcdLedgerManagerTest.java
 
b/metadata-drivers/etcd/src/test/java/org/apache/bookkeeper/metadata/etcd/EtcdLedgerManagerTest.java
index 27dc84c..30877f7 100644
--- 
a/metadata-drivers/etcd/src/test/java/org/apache/bookkeeper/metadata/etcd/EtcdLedgerManagerTest.java
+++ 
b/metadata-drivers/etcd/src/test/java/org/apache/bookkeeper/metadata/etcd/EtcdLedgerManagerTest.java
@@ -53,9 +53,9 @@ import org.apache.bookkeeper.client.BookKeeper.DigestType;
 import org.apache.bookkeeper.client.LedgerMetadataBuilder;
 import org.apache.bookkeeper.client.api.LedgerMetadata;
 import org.apache.bookkeeper.common.concurrent.FutureUtils;
-import org.apache.bookkeeper.conf.ClientConfiguration;
 import org.apache.bookkeeper.meta.LedgerManager.LedgerRange;
 import org.apache.bookkeeper.meta.LedgerManager.LedgerRangeIterator;
+import org.apache.bookkeeper.meta.LedgerMetadataSerDe;
 import org.apache.bookkeeper.metadata.etcd.helpers.ValueStream;
 import org.apache.bookkeeper.metadata.etcd.testing.EtcdTestBase;
 import org.apache.bookkeeper.net.BookieSocketAddress;
@@ -82,7 +82,8 @@ public class EtcdLedgerManagerTest extends EtcdTestBase {
     public void setUp() throws Exception {
         super.setUp();
         this.scope = RandomStringUtils.randomAlphabetic(8);
-        this.lm = new EtcdLedgerManager(new ClientConfiguration(), etcdClient, 
scope);
+        this.lm = new EtcdLedgerManager(etcdClient, scope,
+                                        
LedgerMetadataSerDe.CURRENT_METADATA_FORMAT_VERSION);
     }
 
     @Override
diff --git 
a/tests/shaded/distributedlog-core-shaded-test/src/test/java/org/apache/bookkeeper/tests/shaded/DistributedLogCoreShadedJarTest.java
 
b/tests/shaded/distributedlog-core-shaded-test/src/test/java/org/apache/bookkeeper/tests/shaded/DistributedLogCoreShadedJarTest.java
index 6432c30..26230ec 100644
--- 
a/tests/shaded/distributedlog-core-shaded-test/src/test/java/org/apache/bookkeeper/tests/shaded/DistributedLogCoreShadedJarTest.java
+++ 
b/tests/shaded/distributedlog-core-shaded-test/src/test/java/org/apache/bookkeeper/tests/shaded/DistributedLogCoreShadedJarTest.java
@@ -179,7 +179,7 @@ public class DistributedLogCoreShadedJarTest {
         when(manager.readLedgerLayout()).thenReturn(layout);
 
         LedgerManagerFactory factory = mock(LedgerManagerFactory.class);
-        when(factory.initialize(any(AbstractConfiguration.class), 
same(manager), anyInt()))
+        when(factory.initialize(any(AbstractConfiguration.class), 
same(manager), anyInt(), anyInt()))
             .thenReturn(factory);
         PowerMockito.mockStatic(ReflectionUtils.class);
         when(ReflectionUtils.newInstance(any(Class.class)))
@@ -191,7 +191,7 @@ public class DistributedLogCoreShadedJarTest {
             if (allowShaded) {
                 assertSame(factory, result);
                 verify(factory, times(1))
-                    .initialize(any(AbstractConfiguration.class), 
same(manager), anyInt());
+                    .initialize(any(AbstractConfiguration.class), 
same(manager), anyInt(), anyInt());
             } else {
                 fail("Should fail to instantiate ledger manager factory if 
allowShaded is false");
             }

Reply via email to