jiazhai closed pull request #210: Issue 209: Support rename log
URL: https://github.com/apache/distributedlog/pull/210
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/distributedlog-core/src/main/java/org/apache/distributedlog/BKDistributedLogNamespace.java
 
b/distributedlog-core/src/main/java/org/apache/distributedlog/BKDistributedLogNamespace.java
index cd5a17a2..d2e21695 100644
--- 
a/distributedlog-core/src/main/java/org/apache/distributedlog/BKDistributedLogNamespace.java
+++ 
b/distributedlog-core/src/main/java/org/apache/distributedlog/BKDistributedLogNamespace.java
@@ -25,6 +25,7 @@
 import java.net.URI;
 import java.util.Iterator;
 import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.bookkeeper.feature.FeatureProvider;
@@ -33,6 +34,7 @@
 import org.apache.distributedlog.api.DistributedLogManager;
 import org.apache.distributedlog.api.namespace.Namespace;
 import org.apache.distributedlog.callback.NamespaceListener;
+import org.apache.distributedlog.common.concurrent.FutureUtils;
 import org.apache.distributedlog.common.util.PermitLimiter;
 import org.apache.distributedlog.common.util.SchedulerUtils;
 import org.apache.distributedlog.config.DynamicDistributedLogConfiguration;
@@ -194,6 +196,28 @@ public DistributedLogManager openLog(String logName,
     }
 
     @Override
+    public CompletableFuture<Void> renameLog(String oldName, String newName) {
+        try {
+            checkState();
+            final String oldLogName = validateAndNormalizeName(oldName);
+            final String newLogName = validateAndNormalizeName(newName);
+
+            return driver.getLogMetadataStore().getLogLocation(oldName)
+                .thenCompose(uriOptional -> {
+                    if (uriOptional.isPresent()) {
+                        return driver.getLogStreamMetadataStore(WRITER)
+                            .renameLog(uriOptional.get(), oldLogName, 
newLogName);
+                    } else {
+                        return FutureUtils.exception(
+                            new LogNotFoundException("Log " + oldLogName + " 
isn't found."));
+                    }
+                });
+        } catch (IOException ioe) {
+            return FutureUtils.exception(ioe);
+        }
+    }
+
+    @Override
     public boolean logExists(String logName)
         throws IOException, IllegalArgumentException {
         checkState();
diff --git 
a/distributedlog-core/src/main/java/org/apache/distributedlog/api/namespace/Namespace.java
 
b/distributedlog-core/src/main/java/org/apache/distributedlog/api/namespace/Namespace.java
index dafc0999..712295d4 100644
--- 
a/distributedlog-core/src/main/java/org/apache/distributedlog/api/namespace/Namespace.java
+++ 
b/distributedlog-core/src/main/java/org/apache/distributedlog/api/namespace/Namespace.java
@@ -20,6 +20,7 @@
 import java.io.IOException;
 import java.util.Iterator;
 import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
 import org.apache.bookkeeper.common.annotation.InterfaceAudience.Public;
 import org.apache.bookkeeper.common.annotation.InterfaceStability.Evolving;
 import org.apache.bookkeeper.stats.StatsLogger;
@@ -107,6 +108,19 @@ void deleteLog(String logName)
             throws InvalidStreamNameException, LogNotFoundException, 
IOException;
 
     /**
+     * Rename a log from <i>oldName</i> to <i>newName</i>.
+     *
+     * @param oldName old log name
+     * @param newName new log name
+     * @return a future represents the rename result.
+     * @throws InvalidStreamNameException if log name is invalid
+     * @throws LogNotFoundException if old log doesn't exist
+     * @throws org.apache.distributedlog.exceptions.LogExistsException if the 
new log exists
+     * @throws IOException when encountered issues with backend.
+     */
+    CompletableFuture<Void> renameLog(String oldName, String newName);
+
+    /**
      * Open a log named <i>logName</i>.
      * A distributedlog manager is returned to access log <i>logName</i>.
      *
diff --git 
a/distributedlog-core/src/main/java/org/apache/distributedlog/impl/metadata/ZKLogStreamMetadataStore.java
 
b/distributedlog-core/src/main/java/org/apache/distributedlog/impl/metadata/ZKLogStreamMetadataStore.java
index 3c55edc8..b3250fa2 100644
--- 
a/distributedlog-core/src/main/java/org/apache/distributedlog/impl/metadata/ZKLogStreamMetadataStore.java
+++ 
b/distributedlog-core/src/main/java/org/apache/distributedlog/impl/metadata/ZKLogStreamMetadataStore.java
@@ -17,14 +17,20 @@
  */
 package org.apache.distributedlog.impl.metadata;
 
+import static com.google.common.base.Charsets.UTF_8;
 import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
+import static org.apache.distributedlog.DistributedLogConstants.EMPTY_BYTES;
+import static 
org.apache.distributedlog.DistributedLogConstants.UNASSIGNED_LOGSEGMENT_SEQNO;
 import static org.apache.distributedlog.metadata.LogMetadata.*;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Optional;
 import com.google.common.collect.Lists;
 import java.io.IOException;
 import java.net.URI;
+import java.util.Collections;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.concurrent.CancellationException;
 import java.util.concurrent.CompletableFuture;
@@ -35,13 +41,16 @@
 import org.apache.bookkeeper.versioning.Versioned;
 import org.apache.distributedlog.DistributedLogConfiguration;
 import org.apache.distributedlog.DistributedLogConstants;
+import org.apache.distributedlog.LogSegmentMetadata;
 import org.apache.distributedlog.ZooKeeperClient;
+import org.apache.distributedlog.ZooKeeperClient.ZooKeeperConnectionException;
 import org.apache.distributedlog.common.concurrent.FutureUtils;
 import org.apache.distributedlog.common.util.PermitManager;
 import org.apache.distributedlog.common.util.SchedulerUtils;
 import org.apache.distributedlog.exceptions.DLInterruptedException;
 import org.apache.distributedlog.exceptions.InvalidStreamNameException;
 import org.apache.distributedlog.exceptions.LockCancelledException;
+import org.apache.distributedlog.exceptions.LockingException;
 import org.apache.distributedlog.exceptions.LogExistsException;
 import org.apache.distributedlog.exceptions.LogNotFoundException;
 import org.apache.distributedlog.exceptions.UnexpectedException;
@@ -65,7 +74,10 @@
 import org.apache.zookeeper.AsyncCallback;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.KeeperException.Code;
 import org.apache.zookeeper.Op;
+import org.apache.zookeeper.Op.Create;
+import org.apache.zookeeper.Op.Delete;
 import org.apache.zookeeper.OpResult;
 import org.apache.zookeeper.ZKUtil;
 import org.apache.zookeeper.ZooKeeper;
@@ -366,16 +378,16 @@ static void createMissingMetadata(final ZooKeeper zk,
             pathsToCreate.add(null);
         } else {
             String logRootParentPath = Utils.getParent(logRootPath);
-            pathsToCreate.add(DistributedLogConstants.EMPTY_BYTES);
-            zkOps.add(Op.create(logRootParentPath, 
DistributedLogConstants.EMPTY_BYTES, acl, createMode));
+            pathsToCreate.add(EMPTY_BYTES);
+            zkOps.add(Op.create(logRootParentPath, EMPTY_BYTES, acl, 
createMode));
         }
 
         // log root path
         if (pathExists(metadatas.get(MetadataIndex.LOG_ROOT))) {
             pathsToCreate.add(null);
         } else {
-            pathsToCreate.add(DistributedLogConstants.EMPTY_BYTES);
-            zkOps.add(Op.create(logRootPath, 
DistributedLogConstants.EMPTY_BYTES, acl, createMode));
+            pathsToCreate.add(EMPTY_BYTES);
+            zkOps.add(Op.create(logRootPath, EMPTY_BYTES, acl, createMode));
         }
 
         // max id
@@ -398,15 +410,15 @@ static void createMissingMetadata(final ZooKeeper zk,
         if (pathExists(metadatas.get(MetadataIndex.LOCK))) {
             pathsToCreate.add(null);
         } else {
-            pathsToCreate.add(DistributedLogConstants.EMPTY_BYTES);
-            zkOps.add(Op.create(logRootPath + LOCK_PATH, 
DistributedLogConstants.EMPTY_BYTES, acl, createMode));
+            pathsToCreate.add(EMPTY_BYTES);
+            zkOps.add(Op.create(logRootPath + LOCK_PATH, EMPTY_BYTES, acl, 
createMode));
         }
         // read lock path
         if (pathExists(metadatas.get(MetadataIndex.READ_LOCK))) {
             pathsToCreate.add(null);
         } else {
-            pathsToCreate.add(DistributedLogConstants.EMPTY_BYTES);
-            zkOps.add(Op.create(logRootPath + READ_LOCK_PATH, 
DistributedLogConstants.EMPTY_BYTES, acl, createMode));
+            pathsToCreate.add(EMPTY_BYTES);
+            zkOps.add(Op.create(logRootPath + READ_LOCK_PATH, EMPTY_BYTES, 
acl, createMode));
         }
         // log segments path
         if (pathExists(metadatas.get(MetadataIndex.LOGSEGMENTS))) {
@@ -422,9 +434,9 @@ static void createMissingMetadata(final ZooKeeper zk,
             if (pathExists(metadatas.get(MetadataIndex.ALLOCATION))) {
                 pathsToCreate.add(null);
             } else {
-                pathsToCreate.add(DistributedLogConstants.EMPTY_BYTES);
+                pathsToCreate.add(EMPTY_BYTES);
                 zkOps.add(Op.create(logRootPath + ALLOCATION_PATH,
-                        DistributedLogConstants.EMPTY_BYTES, acl, createMode));
+                        EMPTY_BYTES, acl, createMode));
             }
         }
         if (zkOps.isEmpty()) {
@@ -620,4 +632,274 @@ public void processResult(int rc, String path, Object 
ctx) {
         }
         return promise;
     }
+
+    //
+    // Rename Log
+    //
+
+    @Override
+    public CompletableFuture<Void> renameLog(URI uri, String oldStreamName, 
String newStreamName) {
+        return getLog(
+            uri,
+            oldStreamName,
+            true,
+            false
+        ).thenCompose(metadata -> renameLogMetadata(uri, metadata, 
newStreamName));
+    }
+
+    private CompletableFuture<Void> renameLogMetadata(URI uri,
+                                                      LogMetadataForWriter 
oldMetadata,
+                                                      String newStreamName) {
+
+
+        final LinkedList<Op> createOps = Lists.newLinkedList();
+        final LinkedList<Op> deleteOps = Lists.newLinkedList();
+
+        List<ACL> acls = zooKeeperClient.getDefaultACL();
+
+        // get the root path
+        String oldRootPath = oldMetadata.getLogRootPath();
+        String newRootPath = LogMetadata.getLogRootPath(
+            uri, newStreamName, conf.getUnpartitionedStreamName());
+
+        // 0. the log path
+        deleteOps.addFirst(Op.delete(
+            LogMetadata.getLogStreamPath(uri, oldMetadata.getLogName()), -1));
+
+        // 1. the root path
+        createOps.addLast(Op.create(
+            newRootPath, EMPTY_BYTES, acls, CreateMode.PERSISTENT));
+        deleteOps.addFirst(Op.delete(
+            oldRootPath, -1));
+
+        // 2. max id
+        Versioned<byte[]> maxTxIdData = oldMetadata.getMaxTxIdData();
+        deleteOldPathAndCreateNewPath(
+            oldRootPath, MAX_TXID_PATH, maxTxIdData,
+            newRootPath, DLUtils.serializeTransactionId(0L), acls,
+            createOps, deleteOps
+        );
+
+        // 3. version
+        createOps.addLast(Op.create(
+            newRootPath + VERSION_PATH, intToBytes(LAYOUT_VERSION), acls, 
CreateMode.PERSISTENT));
+        deleteOps.addFirst(Op.delete(
+            oldRootPath + VERSION_PATH, -1));
+
+        // 4. lock path (NOTE: if the stream is locked by a writer, then the 
delete will fail as you can not
+        //    delete the lock path if children is not empty.
+        createOps.addLast(Op.create(
+            newRootPath + LOCK_PATH, EMPTY_BYTES, acls, 
CreateMode.PERSISTENT));
+        deleteOps.addFirst(Op.delete(
+            oldRootPath + LOCK_PATH, -1));
+
+        // 5. read lock path (NOTE: same reason as the write lock)
+        createOps.addLast(Op.create(
+            newRootPath + READ_LOCK_PATH, EMPTY_BYTES, acls, 
CreateMode.PERSISTENT));
+        deleteOps.addFirst(Op.delete(
+            oldRootPath + READ_LOCK_PATH, -1));
+
+        // 6. allocation path
+        Versioned<byte[]> allocationData = oldMetadata.getAllocationData();
+        deleteOldPathAndCreateNewPath(
+            oldRootPath, ALLOCATION_PATH, allocationData,
+            newRootPath, EMPTY_BYTES, acls,
+            createOps, deleteOps);
+
+        // 7. log segments
+        Versioned<byte[]> maxLSSNData = oldMetadata.getMaxLSSNData();
+        deleteOldPathAndCreateNewPath(
+            oldRootPath, LOGSEGMENTS_PATH, maxLSSNData,
+            newRootPath, 
DLUtils.serializeLogSegmentSequenceNumber(UNASSIGNED_LOGSEGMENT_SEQNO), acls,
+            createOps, deleteOps);
+
+        // 8. copy the log segments
+        CompletableFuture<List<LogSegmentMetadata>> segmentsFuture;
+        if (pathExists(maxLSSNData)) {
+            segmentsFuture = getLogSegments(zooKeeperClient, oldRootPath + 
LOGSEGMENTS_PATH);
+        } else {
+            segmentsFuture = FutureUtils.value(Collections.emptyList());
+        }
+        return segmentsFuture
+            // copy the segments
+            .thenApply(segments -> {
+                for (LogSegmentMetadata segment : segments) {
+                    deleteOldSegmentAndCreateNewSegment(
+                        segment,
+                        newRootPath + LOGSEGMENTS_PATH,
+                        acls,
+                        createOps,
+                        deleteOps);
+                }
+                return null;
+            })
+            // get the missing paths
+            .thenCompose(ignored ->
+                getMissingPaths(zooKeeperClient, uri, newStreamName)
+            )
+            // create the missing paths and execute the rename transaction
+            .thenCompose(paths -> {
+                for (String path : paths) {
+                    createOps.addFirst(Op.create(
+                        path, EMPTY_BYTES, acls, CreateMode.PERSISTENT));
+                }
+                return executeRenameTxn(oldRootPath, newRootPath, createOps, 
deleteOps);
+            });
+    }
+
+    @VisibleForTesting
+    static CompletableFuture<List<String>> getMissingPaths(ZooKeeperClient 
zkc, URI uri, String logName) {
+        String basePath = uri.getPath();
+        String logStreamPath = LogMetadata.getLogStreamPath(uri, logName);
+        LinkedList<String> missingPaths = Lists.newLinkedList();
+
+        CompletableFuture<List<String>> future = FutureUtils.createFuture();
+        try {
+            existPath(zkc.get(), logStreamPath, basePath, missingPaths, 
future);
+        } catch (ZooKeeperConnectionException | InterruptedException e) {
+            future.completeExceptionally(e);
+        }
+        return future;
+    }
+
+    private static void existPath(ZooKeeper zk,
+                                  String path,
+                                  String basePath,
+                                  LinkedList<String> missingPaths,
+                                  CompletableFuture<List<String>> future) {
+        if (basePath.equals(path)) {
+            future.complete(missingPaths);
+            return;
+        }
+        zk.exists(path, false, (rc, path1, ctx, stat) -> {
+            if (Code.OK.intValue() != rc && Code.NONODE.intValue() != rc) {
+                future.completeExceptionally(new ZKException("Failed to check 
existence of path " + path1,
+                    Code.get(rc)));
+                return;
+            }
+
+            if (Code.OK.intValue() == rc) {
+                future.complete(missingPaths);
+                return;
+            }
+
+            missingPaths.addLast(path);
+            String parentPath = Utils.getParent(path);
+            existPath(zk, parentPath, basePath, missingPaths, future);
+        }, null);
+    }
+
+    private CompletableFuture<Void> executeRenameTxn(String oldLogPath,
+                                                     String newLogPath,
+                                                     LinkedList<Op> createOps,
+                                                     LinkedList<Op> deleteOps) 
{
+        CompletableFuture<Void> future = FutureUtils.createFuture();
+        List<Op> zkOps = Lists.newArrayListWithExpectedSize(createOps.size() + 
deleteOps.size());
+        zkOps.addAll(createOps);
+        zkOps.addAll(deleteOps);
+
+        if (LOG.isDebugEnabled()) {
+            for (Op op : zkOps) {
+                if (op instanceof Create) {
+                    Create create = (Create) op;
+                    LOG.debug("op : create {}", create.getPath());
+                } else if (op instanceof Delete) {
+                    Delete delete = (Delete) op;
+                    LOG.debug("op : delete {}, record = {}", delete.getPath(), 
op.toRequestRecord());
+                } else {
+                    LOG.debug("op : {}", op);
+                }
+            }
+        }
+
+        try {
+            zooKeeperClient.get().multi(zkOps, (rc, path, ctx, opResults) -> {
+                if (Code.OK.intValue() == rc) {
+                    future.complete(null);
+                } else if (Code.NODEEXISTS.intValue() == rc) {
+                    future.completeExceptionally(new 
LogExistsException("Someone just created new log " + newLogPath));
+                } else if (Code.NOTEMPTY.intValue() == rc) {
+                    future.completeExceptionally(new 
LockingException(oldLogPath + LOCK_PATH,
+                        "Someone is holding a lock on log " + oldLogPath));
+                } else {
+                    future.completeExceptionally(new ZKException("Failed to 
rename log "
+                        + oldLogPath + " to " + newLogPath + " at path " + 
path, Code.get(rc)));
+                }
+            }, null);
+        } catch (ZooKeeperConnectionException e) {
+            future.completeExceptionally(e);
+        } catch (InterruptedException e) {
+            future.completeExceptionally(e);
+        }
+        return future;
+    }
+
+    private static void deleteOldSegmentAndCreateNewSegment(LogSegmentMetadata 
oldMetadata,
+                                                            String 
newSegmentsPath,
+                                                            List<ACL> acls,
+                                                            LinkedList<Op> 
createOps,
+                                                            LinkedList<Op> 
deleteOps) {
+        createOps.addLast(Op.create(
+            newSegmentsPath + "/" + oldMetadata.getZNodeName(),
+            oldMetadata.getFinalisedData().getBytes(UTF_8),
+            acls,
+            CreateMode.PERSISTENT));
+        deleteOps.addFirst(Op.delete(
+            oldMetadata.getZkPath(),
+            -1));
+    }
+
+    private static void deleteOldPathAndCreateNewPath(String oldRootPath,
+                                                      String nodePath,
+                                                      Versioned<byte[]> 
pathData,
+                                                      String newRootPath,
+                                                      byte[] initData,
+                                                      List<ACL> acls,
+                                                      LinkedList<Op> createOps,
+                                                      LinkedList<Op> 
deleteOps) {
+        if (pathExists(pathData)) {
+            createOps.addLast(Op.create(
+                newRootPath + nodePath, pathData.getValue(), acls, 
CreateMode.PERSISTENT));
+            deleteOps.addFirst(Op.delete(
+                oldRootPath + nodePath, (int) ((LongVersion) 
pathData.getVersion()).getLongVersion()));
+        } else {
+            createOps.addLast(Op.create(
+                newRootPath + nodePath, initData, acls, 
CreateMode.PERSISTENT));
+        }
+    }
+
+    @VisibleForTesting
+    static CompletableFuture<List<LogSegmentMetadata>> 
getLogSegments(ZooKeeperClient zk,
+                                                                      String 
logSegmentsPath) {
+        CompletableFuture<List<LogSegmentMetadata>> future = 
FutureUtils.createFuture();
+        try {
+            zk.get().getChildren(logSegmentsPath, false, (rc, path, ctx, 
children, stat) -> {
+                if (Code.OK.intValue() != rc) {
+                    if (Code.NONODE.intValue() == rc) {
+                        future.completeExceptionally(new 
LogNotFoundException("Log " + path + " not found"));
+                    } else {
+                        future.completeExceptionally(new ZKException("Failed 
to get log segments from " + path,
+                            Code.get(rc)));
+                    }
+                    return;
+                }
+
+                // get all the segments
+                List<CompletableFuture<LogSegmentMetadata>> futures =
+                    Lists.newArrayListWithExpectedSize(children.size());
+                for (String child : children) {
+                    futures.add(LogSegmentMetadata.read(zk, logSegmentsPath + 
"/" + child));
+                }
+                FutureUtils.proxyTo(
+                    FutureUtils.collect(futures),
+                    future);
+            }, null);
+        } catch (ZooKeeperConnectionException e) {
+            future.completeExceptionally(e);
+        } catch (InterruptedException e) {
+            future.completeExceptionally(e);
+        }
+        return future;
+    }
+
 }
diff --git 
a/distributedlog-core/src/main/java/org/apache/distributedlog/metadata/LogStreamMetadataStore.java
 
b/distributedlog-core/src/main/java/org/apache/distributedlog/metadata/LogStreamMetadataStore.java
index 41dc5005..712619a5 100644
--- 
a/distributedlog-core/src/main/java/org/apache/distributedlog/metadata/LogStreamMetadataStore.java
+++ 
b/distributedlog-core/src/main/java/org/apache/distributedlog/metadata/LogStreamMetadataStore.java
@@ -95,6 +95,18 @@
     CompletableFuture<Void> deleteLog(URI uri, String streamName);
 
     /**
+     * Rename the log from <i>oldStreamName</i> to <i>newStreamName</i>.
+     *
+     * @param uri the location to store the metadata of the log
+     * @param oldStreamName the old name of the log stream
+     * @param newStreamName the new name of the log stream
+     * @return future represents the result of the rename operation.
+     */
+    CompletableFuture<Void> renameLog(URI uri,
+                                      String oldStreamName,
+                                      String newStreamName);
+
+    /**
      * Get the log segment metadata store.
      *
      * @return the log segment metadata store.
diff --git 
a/distributedlog-core/src/test/java/org/apache/distributedlog/impl/metadata/TestZKLogStreamMetadataStore.java
 
b/distributedlog-core/src/test/java/org/apache/distributedlog/impl/metadata/TestZKLogStreamMetadataStore.java
index f1cec9d9..4aa832a0 100644
--- 
a/distributedlog-core/src/test/java/org/apache/distributedlog/impl/metadata/TestZKLogStreamMetadataStore.java
+++ 
b/distributedlog-core/src/test/java/org/apache/distributedlog/impl/metadata/TestZKLogStreamMetadataStore.java
@@ -17,36 +17,60 @@
  */
 package org.apache.distributedlog.impl.metadata;
 
+import static com.google.common.base.Charsets.UTF_8;
+import static org.apache.distributedlog.DistributedLogConstants.EMPTY_BYTES;
 import static 
org.apache.distributedlog.impl.metadata.ZKLogStreamMetadataStore.*;
 import static org.apache.distributedlog.metadata.LogMetadata.*;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyBoolean;
+import static org.mockito.Matchers.anyObject;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
 import com.google.common.collect.Lists;
 import java.net.URI;
+import java.util.Collections;
 import java.util.List;
+import org.apache.bookkeeper.stats.NullStatsLogger;
 import org.apache.bookkeeper.util.ZkUtils;
 import org.apache.bookkeeper.versioning.LongVersion;
 import org.apache.bookkeeper.versioning.Versioned;
 import org.apache.distributedlog.DLMTestUtil;
 import org.apache.distributedlog.DistributedLogConfiguration;
 import org.apache.distributedlog.DistributedLogConstants;
+import org.apache.distributedlog.LogSegmentMetadata;
 import org.apache.distributedlog.TestZooKeeperClientBuilder;
 import org.apache.distributedlog.ZooKeeperClient;
 import org.apache.distributedlog.ZooKeeperClusterTestCase;
 import org.apache.distributedlog.api.MetadataAccessor;
 import org.apache.distributedlog.api.namespace.Namespace;
 import org.apache.distributedlog.api.namespace.NamespaceBuilder;
+import org.apache.distributedlog.common.concurrent.FutureUtils;
+import org.apache.distributedlog.exceptions.LockingException;
+import org.apache.distributedlog.exceptions.LogExistsException;
 import org.apache.distributedlog.exceptions.LogNotFoundException;
+import org.apache.distributedlog.exceptions.ZKException;
 import org.apache.distributedlog.metadata.DLMetadata;
+import org.apache.distributedlog.metadata.LogMetadata;
 import org.apache.distributedlog.metadata.LogMetadataForWriter;
 import org.apache.distributedlog.util.DLUtils;
+import org.apache.distributedlog.util.OrderedScheduler;
 import org.apache.distributedlog.util.Utils;
+import org.apache.zookeeper.AsyncCallback.Children2Callback;
+import org.apache.zookeeper.AsyncCallback.StatCallback;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.KeeperException.Code;
 import org.apache.zookeeper.Transaction;
 import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.ZooKeeper;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Rule;
@@ -69,6 +93,8 @@
 
     private ZooKeeperClient zkc;
     private URI uri;
+    private OrderedScheduler scheduler;
+    private ZKLogStreamMetadataStore metadataStore;
 
     private static void createLog(ZooKeeperClient zk, URI uri, String logName, 
String logIdentifier)
             throws Exception {
@@ -88,17 +114,68 @@ private static void createLog(ZooKeeperClient zk, URI uri, 
String logName, Strin
                 zk.getDefaultACL(), CreateMode.PERSISTENT);
         txn.create(maxTxIdPath, DLUtils.serializeTransactionId(0L),
                 zk.getDefaultACL(), CreateMode.PERSISTENT);
-        txn.create(lockPath, DistributedLogConstants.EMPTY_BYTES,
+        txn.create(lockPath, EMPTY_BYTES,
                 zk.getDefaultACL(), CreateMode.PERSISTENT);
-        txn.create(readLockPath, DistributedLogConstants.EMPTY_BYTES,
+        txn.create(readLockPath, EMPTY_BYTES,
                 zk.getDefaultACL(), CreateMode.PERSISTENT);
         txn.create(versionPath, intToBytes(LAYOUT_VERSION),
                 zk.getDefaultACL(), CreateMode.PERSISTENT);
-        txn.create(allocationPath, DistributedLogConstants.EMPTY_BYTES,
+        txn.create(allocationPath, EMPTY_BYTES,
                 zk.getDefaultACL(), CreateMode.PERSISTENT);
         txn.commit();
     }
 
+    private static void createLog(ZooKeeperClient zk,
+                                  URI uri,
+                                  String logName,
+                                  String logIdentifier,
+                                  int numSegments)
+            throws Exception {
+        final String logRootPath = getLogRootPath(uri, logName, logIdentifier);
+        final String logSegmentsPath = logRootPath + LOGSEGMENTS_PATH;
+        final String maxTxIdPath = logRootPath + MAX_TXID_PATH;
+        final String lockPath = logRootPath + LOCK_PATH;
+        final String readLockPath = logRootPath + READ_LOCK_PATH;
+        final String versionPath = logRootPath + VERSION_PATH;
+        final String allocationPath = logRootPath + ALLOCATION_PATH;
+
+        Utils.zkCreateFullPathOptimistic(zk, logRootPath, new byte[0],
+                zk.getDefaultACL(), CreateMode.PERSISTENT);
+        Transaction txn = zk.get().transaction();
+        txn.create(logSegmentsPath, DLUtils.serializeLogSegmentSequenceNumber(
+                        DistributedLogConstants.UNASSIGNED_LOGSEGMENT_SEQNO),
+                zk.getDefaultACL(), CreateMode.PERSISTENT);
+        txn.create(maxTxIdPath, DLUtils.serializeTransactionId(0L),
+                zk.getDefaultACL(), CreateMode.PERSISTENT);
+        txn.create(lockPath, EMPTY_BYTES,
+                zk.getDefaultACL(), CreateMode.PERSISTENT);
+        txn.create(readLockPath, EMPTY_BYTES,
+                zk.getDefaultACL(), CreateMode.PERSISTENT);
+        txn.create(versionPath, intToBytes(LAYOUT_VERSION),
+                zk.getDefaultACL(), CreateMode.PERSISTENT);
+        txn.create(allocationPath, EMPTY_BYTES,
+                zk.getDefaultACL(), CreateMode.PERSISTENT);
+
+        for (int i = 0; i < numSegments; i++) {
+            LogSegmentMetadata segment = DLMTestUtil.completedLogSegment(
+                logSegmentsPath,
+                i + 1L,
+                1L + i * 1000L,
+                (i + 1) * 1000L,
+                1000,
+                i + 1L,
+                999L,
+                0L);
+            txn.create(
+                segment.getZkPath(),
+                segment.getFinalisedData().getBytes(UTF_8),
+                zk.getDefaultACL(),
+                CreateMode.PERSISTENT);
+        }
+
+        txn.commit();
+    }
+
     @Before
     public void setup() throws Exception {
         zkc = TestZooKeeperClientBuilder.newBuilder()
@@ -117,10 +194,26 @@ public void setup() throws Exception {
         } catch (KeeperException.NodeExistsException nee) {
             logger.debug("The namespace uri already exists.");
         }
+        scheduler = OrderedScheduler.newBuilder()
+            .name("test-scheduler")
+            .corePoolSize(1)
+            .build();
+        metadataStore = new ZKLogStreamMetadataStore(
+            "test-logstream-metadata-store",
+            new DistributedLogConfiguration(),
+            zkc,
+            scheduler,
+            NullStatsLogger.INSTANCE);
     }
 
     @After
     public void teardown() throws Exception {
+        if (null != metadataStore) {
+            metadataStore.close();
+        }
+        if (null != scheduler) {
+            scheduler.shutdown();
+        }
         zkc.close();
     }
 
@@ -323,4 +416,167 @@ public void testCreateLogMetadataWithCustomMetadata() 
throws Exception {
         testCreateLogMetadataWithMissingPaths(uri, logName, logIdentifier, 
pathsToDelete, true, false);
     }
 
+    @Test(timeout = 60000, expected = LogNotFoundException.class)
+    public void testGetLogSegmentsLogNotFound() throws Exception {
+        String logName = testName.getMethodName();
+        String logIdentifier = "<default>";
+
+        String logSegmentsPath = LogMetadata.getLogSegmentsPath(uri, logName, 
logIdentifier);
+        FutureUtils.result(getLogSegments(zkc, logSegmentsPath));
+    }
+
+    @Test(timeout = 60000)
+    public void testGetLogSegmentsZKExceptions() throws Exception {
+        String logName = testName.getMethodName();
+        String logIdentifier = "<default>";
+
+        ZooKeeper mockZk = mock(ZooKeeper.class);
+        ZooKeeperClient mockZkc = mock(ZooKeeperClient.class);
+        when(mockZkc.get()).thenReturn(mockZk);
+        doAnswer(invocationOnMock -> {
+            String path = (String) invocationOnMock.getArguments()[0];
+            Children2Callback callback = (Children2Callback) 
invocationOnMock.getArguments()[2];
+            callback.processResult(Code.BADVERSION.intValue(), path, null, 
null, null);
+            return null;
+        }).when(mockZk).getChildren(anyString(), anyBoolean(), 
any(Children2Callback.class), anyObject());
+
+        String logSegmentsPath = LogMetadata.getLogSegmentsPath(uri, logName, 
logIdentifier);
+        try {
+            FutureUtils.result(getLogSegments(mockZkc, logSegmentsPath));
+            fail("Should fail to get log segments when encountering zk 
exceptions");
+        } catch (ZKException zke) {
+            assertEquals(Code.BADVERSION, zke.getKeeperExceptionCode());
+        }
+    }
+
+    @Test(timeout = 60000)
+    public void testGetLogSegments() throws Exception {
+        String logName = testName.getMethodName();
+        String logIdentifier = "<default>";
+
+        // create log
+        createLog(
+            zkc,
+            uri,
+            logName,
+            logIdentifier,
+            5);
+
+        List<LogSegmentMetadata> segments = FutureUtils.result(
+            getLogSegments(zkc, LogMetadata.getLogSegmentsPath(uri, logName, 
logIdentifier)));
+        assertEquals(5, segments.size());
+        for (int i = 0; i < 5; i++) {
+            assertEquals(1L + i, 
segments.get(i).getLogSegmentSequenceNumber());
+        }
+    }
+
+    @Test(timeout = 60000)
+    public void testGetMissingPathsRecursive() throws Exception {
+        List<String> missingPaths = FutureUtils.result(
+            getMissingPaths(zkc, uri, "path/to/log"));
+
+        assertEquals(
+            Lists.newArrayList(
+                uri.getPath() + "/path/to/log",
+                uri.getPath() + "/path/to",
+                uri.getPath() + "/path"
+            ),
+            missingPaths);
+    }
+
+    @Test(timeout = 60000)
+    public void testGetMissingPathsRecursive2() throws Exception {
+        String path = uri.getPath() + "/path/to/log";
+        ZkUtils.createFullPathOptimistic(
+            zkc.get(), path, EMPTY_BYTES, zkc.getDefaultACL(), 
CreateMode.PERSISTENT);
+
+        List<String> missingPaths = FutureUtils.result(
+            getMissingPaths(zkc, uri, "path/to/log"));
+
+        assertEquals(
+            Collections.emptyList(),
+            missingPaths);
+    }
+
+    @Test(timeout = 60000)
+    public void testGetMissingPathsFailure() throws Exception {
+        ZooKeeper mockZk = mock(ZooKeeper.class);
+        ZooKeeperClient mockZkc = mock(ZooKeeperClient.class);
+        when(mockZkc.get()).thenReturn(mockZk);
+        doAnswer(invocationOnMock -> {
+            String path = (String) invocationOnMock.getArguments()[0];
+            StatCallback callback = (StatCallback) 
invocationOnMock.getArguments()[2];
+            callback.processResult(Code.BADVERSION.intValue(), path, null, 
null);
+            return null;
+        }).when(mockZk).exists(anyString(), anyBoolean(), 
any(StatCallback.class), anyObject());
+
+        try {
+            FutureUtils.result(getMissingPaths(mockZkc, uri, "path/to/log"));
+            fail("Should fail on getting missing paths on zookeeper 
exceptions.");
+        } catch (ZKException zke) {
+            assertEquals(Code.BADVERSION, zke.getKeeperExceptionCode());
+        }
+    }
+
+    @Test(timeout = 60000)
+    public void testRenameLog() throws Exception {
+        String logName = testName.getMethodName();
+        String logIdentifier = "<default>";
+        int numSegments = 5;
+
+        createLog(
+            zkc,
+            uri,
+            logName,
+            logIdentifier,
+            numSegments);
+
+        String newLogName = "path/to/new/" + logName;
+        FutureUtils.result(metadataStore.renameLog(uri, logName, newLogName));
+    }
+
+    @Test(timeout = 60000, expected = LogExistsException.class)
+    public void testRenameLogExists() throws Exception {
+        String logName = testName.getMethodName();
+        String logIdentifier = "<default>";
+        int numSegments = 5;
+        createLog(
+            zkc,
+            uri,
+            logName,
+            logIdentifier,
+            numSegments);
+
+        String newLogName = "path/to/new/" + logName;
+        createLog(
+            zkc,
+            uri,
+            newLogName,
+            logIdentifier,
+            3);
+
+        FutureUtils.result(metadataStore.renameLog(uri, logName, newLogName));
+    }
+
+    @Test(timeout = 60000, expected = LockingException.class)
+    public void testRenameLockedLog() throws Exception {
+        String logName = testName.getMethodName();
+        String logIdentifier = "<default>";
+        int numSegments = 5;
+        createLog(
+            zkc,
+            uri,
+            logName,
+            logIdentifier,
+            numSegments);
+
+        // create a lock
+        String logRootPath = getLogRootPath(uri, logName, logIdentifier);
+        String lockPath = logRootPath + LOCK_PATH;
+        zkc.get().create(lockPath + "/test", new byte[0], Ids.OPEN_ACL_UNSAFE, 
CreateMode.EPHEMERAL_SEQUENTIAL);
+
+        String newLogName = "path/to/new/" + logName;
+        FutureUtils.result(metadataStore.renameLog(uri, logName, newLogName));
+    }
+
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to