This is an automated email from the ASF dual-hosted git repository.

ivank pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new dce4fd4  Add max ledger metadata format version to layout
dce4fd4 is described below

commit dce4fd4c16e5423a76a4be7aa76b23a52335c7fa
Author: Ivan Kelly <[email protected]>
AuthorDate: Tue Dec 4 12:03:14 2018 +0100

    Add max ledger metadata format version to layout
    
    The max ledger metadata format version is the maximum format version
    that will be used to write ledger metadata. By setting it in the
    ledger layout it becomes a cluster-wide configuration which is
    initialized along with the cluster. Any cluster initialized with the
    current code will end up with version 2. For this cluster serde will
    only ever serialize with up to version 2 of the ledger metadata
    format, so all clients that understand version 2 will continue to
    work, even as the software version increases and new metadata formats
    become available (such as the binary metadata format coming soon).
    
    Currently there is no handling in LedgerMetadataSerDe, because we
    don't have a new version, but when there is, it will use
    ```
    Math.min(maxLedgerMetadataFormatVersion,
             metadata.getMetadataFormatVersion())
    ```
    to decide which serialization method to use.
    
    
    Reviewers: Enrico Olivelli <[email protected]>, Sijie Guo 
<[email protected]>
    
    This closes #1858 from ivankelly/format-version
---
 .../org/apache/bookkeeper/bookie/BookieShell.java  |  7 +++-
 .../bookkeeper/conf/AbstractConfiguration.java     | 24 ++++++++++++
 .../bookkeeper/meta/AbstractZkLedgerManager.java   |  2 +-
 .../meta/AbstractZkLedgerManagerFactory.java       |  1 +
 .../org/apache/bookkeeper/meta/LedgerLayout.java   | 43 ++++++++++++++++++----
 .../bookkeeper/meta/LedgerMetadataSerDe.java       |  6 +++
 .../bookkeeper/meta/MSLedgerManagerFactory.java    |  2 +-
 .../server/http/service/GetLedgerMetaService.java  |  2 +-
 .../server/http/service/ListLedgerService.java     |  2 +-
 .../bookkeeper/client/LedgerMetadataTest.java      |  6 ++-
 .../meta/AbstractZkLedgerManagerTest.java          |  2 +-
 .../apache/bookkeeper/meta/MockLedgerManager.java  |  2 +-
 .../apache/bookkeeper/meta/TestLedgerLayout.java   | 43 ++++++++++++++++++++++
 .../bookkeeper/server/http/TestHttpService.java    |  3 +-
 .../metadata/etcd/EtcdLedgerManager.java           | 30 ++++++++-------
 .../metadata/etcd/EtcdLedgerManagerFactory.java    |  4 +-
 .../metadata/etcd/EtcdLedgerManagerTest.java       |  3 +-
 17 files changed, 147 insertions(+), 35 deletions(-)

diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
index 9108ef3..9c6180b 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
@@ -1106,7 +1106,7 @@ public class BookieShell implements Tool {
     void printLedgerMetadata(long ledgerId, LedgerMetadata md, boolean 
printMeta) {
         System.out.println("ledgerID: " + 
ledgerIdFormatter.formatLedgerId(ledgerId));
         if (printMeta) {
-            System.out.println(new String(new 
LedgerMetadataSerDe().serialize(md), UTF_8));
+            System.out.println(md.toString());
         }
     }
 
@@ -1115,7 +1115,10 @@ public class BookieShell implements Tool {
      */
     class LedgerMetadataCmd extends MyCommand {
         Options lOpts = new Options();
-        LedgerMetadataSerDe serDe = new LedgerMetadataSerDe();
+        // the max version won't actually take effect as this tool
+        // never creates new metadata (there'll already be a format version in 
the existing metadata)
+        LedgerMetadataSerDe serDe = new LedgerMetadataSerDe(
+                LedgerMetadataSerDe.CURRENT_METADATA_FORMAT_VERSION);
 
         LedgerMetadataCmd() {
             super(CMD_LEDGERMETADATA);
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/AbstractConfiguration.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/AbstractConfiguration.java
index 7f00d09..3057bfe 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/AbstractConfiguration.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/AbstractConfiguration.java
@@ -85,6 +85,7 @@ public abstract class AbstractConfiguration<T extends 
AbstractConfiguration>
     protected static final String 
STORE_SYSTEMTIME_AS_LEDGER_UNDERREPLICATED_MARK_TIME =
             "storeSystemTimeAsLedgerUnderreplicatedMarkTime";
     protected static final String STORE_SYSTEMTIME_AS_LEDGER_CREATION_TIME = 
"storeSystemTimeAsLedgerCreationTime";
+    protected static final String MAX_LEDGER_METADATA_FORMAT_VERSION = 
"maxLedgerMetadataFormatVersion";
 
     // Metastore settings, only being used when LEDGER_MANAGER_FACTORY_CLASS 
is MSLedgerManagerFactory
     protected static final String METASTORE_IMPL_CLASS = "metastoreImplClass";
@@ -856,6 +857,29 @@ public abstract class AbstractConfiguration<T extends 
AbstractConfiguration>
     }
 
     /**
+     * Set the maximum format version which should be used when serializing 
ledger metadata.
+     * Setting the maximum format allows us to ensure that all clients running 
against a cluster
+     * will be able to read back all written metadata.
+     * Normally this is set when loading the layout from the metadata store. 
Users should not
+     * set this directly.
+     */
+    public T setMaxLedgerMetadataFormatVersion(int maxVersion) {
+        setProperty(MAX_LEDGER_METADATA_FORMAT_VERSION, maxVersion);
+        return getThis();
+    }
+
+    /**
+     * Get the maximum format version which should be used when serializing 
ledger metadata.
+     * The default is 2, as that was the current version when this 
functionallity was introduced.
+     *
+     * @see #setMaxLedgerMetadataFormatVersion(int)
+     * @return the maximum format version with which to serialize metadata.
+     */
+    public int getMaxLedgerMetadataFormatVersion() {
+        return getInteger(MAX_LEDGER_METADATA_FORMAT_VERSION, 2);
+    }
+
+    /**
      * Whether to preserve MDC for tasks in Executor.
      *
      * @return flag to enable/disable MDC preservation in Executor.
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/AbstractZkLedgerManager.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/AbstractZkLedgerManager.java
index 5dbdd06..97413b7 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/AbstractZkLedgerManager.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/AbstractZkLedgerManager.java
@@ -160,7 +160,7 @@ public abstract class AbstractZkLedgerManager implements 
LedgerManager, Watcher
      *          ZooKeeper Client Handle
      */
     protected AbstractZkLedgerManager(AbstractConfiguration conf, ZooKeeper 
zk) {
-        this.serDe = new LedgerMetadataSerDe();
+        this.serDe = new 
LedgerMetadataSerDe(conf.getMaxLedgerMetadataFormatVersion());
         this.conf = conf;
         this.zk = zk;
         this.ledgerRootPath = 
ZKMetadataDriverBase.resolveZkLedgersRootPath(conf);
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/AbstractZkLedgerManagerFactory.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/AbstractZkLedgerManagerFactory.java
index 72bc3e8..4bb2fb1 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/AbstractZkLedgerManagerFactory.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/AbstractZkLedgerManagerFactory.java
@@ -178,6 +178,7 @@ public abstract class AbstractZkLedgerManagerFactory 
implements LedgerManagerFac
         if (log.isDebugEnabled()) {
             log.debug("read ledger layout {}", layout);
         }
+        
conf.setMaxLedgerMetadataFormatVersion(layout.getMaxLedgerMetadataFormatVersion());
 
         // there is existing layout, we need to look into the layout.
         // handle pre V2 layout
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerLayout.java 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerLayout.java
index eb5b705..c39e6b6 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerLayout.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerLayout.java
@@ -39,8 +39,14 @@ public class LedgerLayout {
     // version of ledger layout metadata
     public static final int LAYOUT_FORMAT_VERSION = 2;
 
-    private static final String splitter = ":";
-    private static final String lSplitter = "\n";
+    private static final String FIELD_SPLITTER = ":";
+    private static final String LINE_SPLITTER = "\n";
+
+    // For version 2 and below, max ledger metadata format wasn't stored in 
the layout
+    // so assume 2 if it is missing.
+    private static final int DEFAULT_MAX_LEDGER_METADATA_FORMAT_VERSION = 2;
+    private static final String MAX_LEDGER_METADATA_FORMAT_VERSION_FIELD =
+        "MAX_LEDGER_METADATA_FORMAT_VERSION";
 
     // ledger manager factory class
     private final String managerFactoryClass;
@@ -50,6 +56,9 @@ public class LedgerLayout {
     // layout version of how to store layout information
     private final int layoutFormatVersion;
 
+    // maximum format version that can be used for storing ledger metadata
+    private final int maxLedgerMetadataFormatVersion;
+
     /**
      * Ledger Layout Constructor.
      *
@@ -59,13 +68,17 @@ public class LedgerLayout {
      *          Ledger Manager Version
      */
     public LedgerLayout(String managerFactoryCls, int managerVersion) {
-        this(managerFactoryCls, managerVersion, LAYOUT_FORMAT_VERSION);
+        this(managerFactoryCls, managerVersion,
+             LedgerMetadataSerDe.CURRENT_METADATA_FORMAT_VERSION,
+             LAYOUT_FORMAT_VERSION);
     }
 
     LedgerLayout(String managerFactoryCls, int managerVersion,
+                 int maxLedgerMetadataFormatVersion,
                  int layoutVersion) {
         this.managerFactoryClass = managerFactoryCls;
         this.managerVersion = managerVersion;
+        this.maxLedgerMetadataFormatVersion = maxLedgerMetadataFormatVersion;
         this.layoutFormatVersion = layoutVersion;
     }
 
@@ -76,8 +89,11 @@ public class LedgerLayout {
      */
     public byte[] serialize() throws IOException {
         String s =
-          new StringBuilder().append(layoutFormatVersion).append(lSplitter)
-              
.append(managerFactoryClass).append(splitter).append(managerVersion).toString();
+          new StringBuilder().append(layoutFormatVersion).append(LINE_SPLITTER)
+            
.append(managerFactoryClass).append(FIELD_SPLITTER).append(managerVersion).append(LINE_SPLITTER)
+            
.append(MAX_LEDGER_METADATA_FORMAT_VERSION_FIELD).append(FIELD_SPLITTER)
+            .append(maxLedgerMetadataFormatVersion)
+            .toString();
 
         if (log.isDebugEnabled()) {
             log.debug("Serialized layout info: {}", s);
@@ -100,7 +116,7 @@ public class LedgerLayout {
             log.debug("Parsing Layout: {}", layout);
         }
 
-        String lines[] = layout.split(lSplitter);
+        String lines[] = layout.split(LINE_SPLITTER);
 
         try {
             int layoutFormatVersion = Integer.parseInt(lines[0]);
@@ -113,7 +129,7 @@ public class LedgerLayout {
                 throw new IOException("Ledger manager and its version absent 
from layout: " + layout);
             }
 
-            String[] parts = lines[1].split(splitter);
+            String[] parts = lines[1].split(FIELD_SPLITTER);
             if (parts.length != 2) {
                 throw new IOException("Invalid Ledger Manager defined in 
layout : " + layout);
             }
@@ -121,7 +137,18 @@ public class LedgerLayout {
             String managerFactoryCls = parts[0];
             // ledger manager version
             int managerVersion = Integer.parseInt(parts[1]);
-            return new LedgerLayout(managerFactoryCls, managerVersion, 
layoutFormatVersion);
+
+            int maxLedgerMetadataFormatVersion = 
DEFAULT_MAX_LEDGER_METADATA_FORMAT_VERSION;
+            if (lines.length >= 3) {
+                String[] metadataFormatParts = lines[2].split(FIELD_SPLITTER);
+                if (metadataFormatParts.length != 2
+                    || 
!metadataFormatParts[0].equals(MAX_LEDGER_METADATA_FORMAT_VERSION_FIELD)) {
+                    throw new IOException("Invalid field for max ledger 
metadata format:" + lines[2]);
+                }
+                maxLedgerMetadataFormatVersion = 
Integer.parseInt(metadataFormatParts[1]);
+            }
+            return new LedgerLayout(managerFactoryCls, managerVersion,
+                                    maxLedgerMetadataFormatVersion, 
layoutFormatVersion);
         } catch (NumberFormatException e) {
             throw new IOException("Could not parse layout '" + layout + "'", 
e);
         }
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerMetadataSerDe.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerMetadataSerDe.java
index 6020a3b..4653af4 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerMetadataSerDe.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerMetadataSerDe.java
@@ -63,6 +63,12 @@ public class LedgerMetadataSerDe {
     private static final String V1_CLOSED_TAG = "CLOSED";
     private static final int V1_IN_RECOVERY_ENTRY_ID = -102;
 
+    private final int maxLedgerMetadataFormatVersion;
+
+    public LedgerMetadataSerDe(int maxLedgerMetadataFormatVersion) {
+        this.maxLedgerMetadataFormatVersion = maxLedgerMetadataFormatVersion;
+    }
+
     public byte[] serialize(LedgerMetadata metadata) {
         if (metadata.getMetadataFormatVersion() == 1) {
             return serializeVersion1(metadata);
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/MSLedgerManagerFactory.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/MSLedgerManagerFactory.java
index fc87632..38c6816 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/MSLedgerManagerFactory.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/MSLedgerManagerFactory.java
@@ -284,7 +284,7 @@ public class MSLedgerManagerFactory extends 
AbstractZkLedgerManagerFactory {
             this.conf = conf;
             this.zk = zk;
             this.metastore = metastore;
-            this.serDe = new LedgerMetadataSerDe();
+            this.serDe = new 
LedgerMetadataSerDe(conf.getMaxLedgerMetadataFormatVersion());
 
             try {
                 ledgerTable = metastore.createScannableTable(TABLE_NAME);
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/server/http/service/GetLedgerMetaService.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/server/http/service/GetLedgerMetaService.java
index 7fcaeda..537b50c 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/server/http/service/GetLedgerMetaService.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/server/http/service/GetLedgerMetaService.java
@@ -53,7 +53,7 @@ public class GetLedgerMetaService implements 
HttpEndpointService {
         checkNotNull(conf);
         this.conf = conf;
         this.bookieServer = bookieServer;
-        this.serDe = new LedgerMetadataSerDe();
+        this.serDe = new 
LedgerMetadataSerDe(LedgerMetadataSerDe.CURRENT_METADATA_FORMAT_VERSION);
     }
 
     @Override
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/server/http/service/ListLedgerService.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/server/http/service/ListLedgerService.java
index f553b64..13022e4 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/server/http/service/ListLedgerService.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/server/http/service/ListLedgerService.java
@@ -58,7 +58,7 @@ public class ListLedgerService implements HttpEndpointService 
{
         checkNotNull(conf);
         this.conf = conf;
         this.bookieServer = bookieServer;
-        this.serDe = new LedgerMetadataSerDe();
+        this.serDe = new 
LedgerMetadataSerDe(LedgerMetadataSerDe.CURRENT_METADATA_FORMAT_VERSION);
 
     }
 
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LedgerMetadataTest.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LedgerMetadataTest.java
index a9ffc2d..d9141e6 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LedgerMetadataTest.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LedgerMetadataTest.java
@@ -80,7 +80,8 @@ public class LedgerMetadataTest {
             .withCreationTime(System.currentTimeMillis())
             .storingCreationTime(true)
             .build();
-        LedgerMetadataFormat format = new 
LedgerMetadataSerDe().buildProtoFormat(lm);
+        LedgerMetadataFormat format = new 
LedgerMetadataSerDe(LedgerMetadataSerDe.CURRENT_METADATA_FORMAT_VERSION)
+            .buildProtoFormat(lm);
         assertTrue(format.hasCtime());
     }
 
@@ -94,7 +95,8 @@ public class LedgerMetadataTest {
         LedgerMetadata lm = LedgerMetadataBuilder.create()
             .newEnsembleEntry(0L, ensemble).build();
 
-        LedgerMetadataFormat format = new 
LedgerMetadataSerDe().buildProtoFormat(lm);
+        LedgerMetadataFormat format = new 
LedgerMetadataSerDe(LedgerMetadataSerDe.CURRENT_METADATA_FORMAT_VERSION)
+            .buildProtoFormat(lm);
         assertFalse(format.hasCtime());
     }
 
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/AbstractZkLedgerManagerTest.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/AbstractZkLedgerManagerTest.java
index 37f55b3..20a8d51 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/AbstractZkLedgerManagerTest.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/AbstractZkLedgerManagerTest.java
@@ -142,7 +142,7 @@ public class AbstractZkLedgerManagerTest extends 
MockZooKeeperTestCase {
         assertSame(conf, ledgerManager.conf);
         assertSame(scheduler, ledgerManager.scheduler);
 
-        this.serDe = new LedgerMetadataSerDe();
+        this.serDe = new 
LedgerMetadataSerDe(LedgerMetadataSerDe.CURRENT_METADATA_FORMAT_VERSION);
     }
 
     @After
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/MockLedgerManager.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/MockLedgerManager.java
index 398bb07..a73adeb 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/MockLedgerManager.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/MockLedgerManager.java
@@ -72,7 +72,7 @@ public class MockLedgerManager implements LedgerManager {
         this.metadataMap = metadataMap;
         this.executor = executor;
         this.ownsExecutor = ownsExecutor;
-        this.serDe = new LedgerMetadataSerDe();
+        this.serDe = new 
LedgerMetadataSerDe(LedgerMetadataSerDe.CURRENT_METADATA_FORMAT_VERSION);
     }
 
     public MockLedgerManager newClient() {
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/TestLedgerLayout.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/TestLedgerLayout.java
index 62315fe..16b30bd 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/TestLedgerLayout.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/TestLedgerLayout.java
@@ -18,8 +18,12 @@
  */
 package org.apache.bookkeeper.meta;
 
+import static java.nio.charset.StandardCharsets.UTF_8;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.fail;
+
+import java.io.IOException;
 
 import org.junit.Test;
 
@@ -63,4 +67,43 @@ public class TestLedgerLayout {
             hierarchical1.getLayoutFormatVersion());
     }
 
+    @Test
+    public void testParseNoMaxLedgerMetadataFormatVersion() throws Exception {
+        LedgerLayout layout = 
LedgerLayout.parseLayout("1\nblahblahLM:3".getBytes(UTF_8));
+
+        assertEquals(layout.getMaxLedgerMetadataFormatVersion(), 2);
+    }
+
+    @Test
+    public void testParseWithMaxLedgerMetadataFormatVersion() throws Exception 
{
+        LedgerLayout layout = LedgerLayout.parseLayout(
+                
"1\nblahblahLM:3\nMAX_LEDGER_METADATA_FORMAT_VERSION:123".getBytes(UTF_8));
+
+        assertEquals(layout.getMaxLedgerMetadataFormatVersion(), 123);
+    }
+
+    @Test
+    public void testCorruptMaxLedgerLayout() throws Exception {
+        try {
+            
LedgerLayout.parseLayout("1\nblahblahLM:3\nMAXXX_LEDGER_METADATA_FORMAT_VERSION:123".getBytes(UTF_8));
+            fail("Shouldn't have been able to parse");
+        } catch (IOException ioe) {
+            // expected
+        }
+
+        try {
+            
LedgerLayout.parseLayout("1\nblahblahLM:3\nMAXXX_LEDGER_METADATA_FORMAT_VERSION:blah".getBytes(UTF_8));
+            fail("Shouldn't have been able to parse");
+        } catch (IOException ioe) {
+            // expected
+        }
+    }
+
+    @Test
+    public void testMoreFieldsAdded() throws Exception {
+        LedgerLayout layout = LedgerLayout.parseLayout(
+                
"1\nblahblahLM:3\nMAX_LEDGER_METADATA_FORMAT_VERSION:123\nFOO:BAR".getBytes(UTF_8));
+
+        assertEquals(layout.getMaxLedgerMetadataFormatVersion(), 123);
+    }
 }
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/server/http/TestHttpService.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/server/http/TestHttpService.java
index c4d441d..e88d29d 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/server/http/TestHttpService.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/server/http/TestHttpService.java
@@ -387,7 +387,8 @@ public class TestHttpService extends 
BookKeeperClusterTestCase {
         assertEquals(1, respBody.size());
         // verify LedgerMetadata content is equal
         assertTrue(respBody.get(ledgerId.toString()).toString()
-                .equals(new String(new 
LedgerMetadataSerDe().serialize(lh[0].getLedgerMetadata()))));
+                .equals(new String(new 
LedgerMetadataSerDe(LedgerMetadataSerDe.CURRENT_METADATA_FORMAT_VERSION)
+                                   .serialize(lh[0].getLedgerMetadata()))));
     }
 
     @Test
diff --git 
a/metadata-drivers/etcd/src/main/java/org/apache/bookkeeper/metadata/etcd/EtcdLedgerManager.java
 
b/metadata-drivers/etcd/src/main/java/org/apache/bookkeeper/metadata/etcd/EtcdLedgerManager.java
index d571bf8..e20a9ba 100644
--- 
a/metadata-drivers/etcd/src/main/java/org/apache/bookkeeper/metadata/etcd/EtcdLedgerManager.java
+++ 
b/metadata-drivers/etcd/src/main/java/org/apache/bookkeeper/metadata/etcd/EtcdLedgerManager.java
@@ -45,6 +45,7 @@ import java.util.function.Function;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.api.LedgerMetadata;
+import org.apache.bookkeeper.conf.AbstractConfiguration;
 import org.apache.bookkeeper.meta.LedgerManager;
 import org.apache.bookkeeper.meta.LedgerMetadataSerDe;
 import org.apache.bookkeeper.metadata.etcd.helpers.KeyIterator;
@@ -64,19 +65,8 @@ import org.apache.zookeeper.AsyncCallback.VoidCallback;
 @Slf4j
 class EtcdLedgerManager implements LedgerManager {
 
-    private final LedgerMetadataSerDe serDe = new LedgerMetadataSerDe();
-    private final Function<ByteSequence, LedgerMetadata> 
ledgerMetadataFunction = bs -> {
-        try {
-            return serDe.parseConfig(
-                bs.getBytes(),
-                Optional.empty()
-            );
-        } catch (IOException ioe) {
-            log.error("Could not parse ledger metadata : {}", 
bs.toStringUtf8(), ioe);
-            throw new RuntimeException(
-                "Could not parse ledger metadata : " + bs.toStringUtf8(), ioe);
-        }
-    };
+    private final LedgerMetadataSerDe serDe;
+    private final Function<ByteSequence, LedgerMetadata> 
ledgerMetadataFunction;
 
     private final String scope;
     private final Client client;
@@ -89,12 +79,24 @@ class EtcdLedgerManager implements LedgerManager {
 
     private volatile boolean closed = false;
 
-    EtcdLedgerManager(Client client,
+    EtcdLedgerManager(AbstractConfiguration conf,
+                      Client client,
                       String scope) {
         this.client = client;
         this.kvClient = client.getKVClient();
         this.scope = scope;
         this.watchClient = new EtcdWatchClient(client);
+        this.serDe = new 
LedgerMetadataSerDe(conf.getMaxLedgerMetadataFormatVersion());
+
+        this.ledgerMetadataFunction = bs -> {
+            try {
+                return serDe.parseConfig(bs.getBytes(), Optional.empty());
+            } catch (IOException ioe) {
+                log.error("Could not parse ledger metadata : {}", 
bs.toStringUtf8(), ioe);
+                throw new RuntimeException(
+                        "Could not parse ledger metadata : " + 
bs.toStringUtf8(), ioe);
+            }
+        };
     }
 
     private boolean isClosed() {
diff --git 
a/metadata-drivers/etcd/src/main/java/org/apache/bookkeeper/metadata/etcd/EtcdLedgerManagerFactory.java
 
b/metadata-drivers/etcd/src/main/java/org/apache/bookkeeper/metadata/etcd/EtcdLedgerManagerFactory.java
index dc7d151..9c19df0 100644
--- 
a/metadata-drivers/etcd/src/main/java/org/apache/bookkeeper/metadata/etcd/EtcdLedgerManagerFactory.java
+++ 
b/metadata-drivers/etcd/src/main/java/org/apache/bookkeeper/metadata/etcd/EtcdLedgerManagerFactory.java
@@ -42,6 +42,7 @@ class EtcdLedgerManagerFactory implements 
LedgerManagerFactory {
 
     private String scope;
     private Client client;
+    private AbstractConfiguration conf;
 
     @Override
     public int getCurrentVersion() {
@@ -66,6 +67,7 @@ class EtcdLedgerManagerFactory implements 
LedgerManagerFactory {
             throw new IOException("Invalid metadata service uri", e);
         }
         this.client = etcdLayoutManager.getClient();
+        this.conf = conf;
         return this;
     }
 
@@ -82,7 +84,7 @@ class EtcdLedgerManagerFactory implements 
LedgerManagerFactory {
 
     @Override
     public LedgerManager newLedgerManager() {
-        return new EtcdLedgerManager(client, scope);
+        return new EtcdLedgerManager(conf, client, scope);
     }
 
     @Override
diff --git 
a/metadata-drivers/etcd/src/test/java/org/apache/bookkeeper/metadata/etcd/EtcdLedgerManagerTest.java
 
b/metadata-drivers/etcd/src/test/java/org/apache/bookkeeper/metadata/etcd/EtcdLedgerManagerTest.java
index 984224c..27dc84c 100644
--- 
a/metadata-drivers/etcd/src/test/java/org/apache/bookkeeper/metadata/etcd/EtcdLedgerManagerTest.java
+++ 
b/metadata-drivers/etcd/src/test/java/org/apache/bookkeeper/metadata/etcd/EtcdLedgerManagerTest.java
@@ -53,6 +53,7 @@ import org.apache.bookkeeper.client.BookKeeper.DigestType;
 import org.apache.bookkeeper.client.LedgerMetadataBuilder;
 import org.apache.bookkeeper.client.api.LedgerMetadata;
 import org.apache.bookkeeper.common.concurrent.FutureUtils;
+import org.apache.bookkeeper.conf.ClientConfiguration;
 import org.apache.bookkeeper.meta.LedgerManager.LedgerRange;
 import org.apache.bookkeeper.meta.LedgerManager.LedgerRangeIterator;
 import org.apache.bookkeeper.metadata.etcd.helpers.ValueStream;
@@ -81,7 +82,7 @@ public class EtcdLedgerManagerTest extends EtcdTestBase {
     public void setUp() throws Exception {
         super.setUp();
         this.scope = RandomStringUtils.randomAlphabetic(8);
-        this.lm = new EtcdLedgerManager(etcdClient, scope);
+        this.lm = new EtcdLedgerManager(new ClientConfiguration(), etcdClient, 
scope);
     }
 
     @Override

Reply via email to