http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/cfc049cd/distributedlog-core/src/main/java/com/twitter/distributedlog/acl/ZKAccessControl.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/acl/ZKAccessControl.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/acl/ZKAccessControl.java deleted file mode 100644 index bf16256..0000000 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/acl/ZKAccessControl.java +++ /dev/null @@ -1,229 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog.acl; - -import com.google.common.base.Objects; -import com.twitter.distributedlog.ZooKeeperClient; -import com.twitter.distributedlog.thrift.AccessControlEntry; -import com.twitter.util.Future; -import com.twitter.util.Promise; -import org.apache.thrift.TException; -import org.apache.thrift.protocol.TJSONProtocol; -import org.apache.thrift.transport.TMemoryBuffer; -import org.apache.thrift.transport.TMemoryInputTransport; -import org.apache.zookeeper.AsyncCallback; -import org.apache.zookeeper.CreateMode; -import org.apache.zookeeper.KeeperException; -import org.apache.zookeeper.Watcher; -import org.apache.zookeeper.data.Stat; - -import java.io.IOException; -import java.io.UnsupportedEncodingException; - -import static com.google.common.base.Charsets.UTF_8; - -public class ZKAccessControl { - - private static final int BUFFER_SIZE = 4096; - - public static final AccessControlEntry DEFAULT_ACCESS_CONTROL_ENTRY = new AccessControlEntry(); - - public static class CorruptedAccessControlException extends IOException { - - private static final long serialVersionUID = 5391285182476211603L; - - public CorruptedAccessControlException(String zkPath, Throwable t) { - super("Access Control @ " + zkPath + " is corrupted.", t); - } - } - - protected final AccessControlEntry accessControlEntry; - protected final String zkPath; - private int zkVersion; - - public ZKAccessControl(AccessControlEntry ace, String zkPath) { - this(ace, zkPath, -1); - } - - private ZKAccessControl(AccessControlEntry ace, String zkPath, int zkVersion) { - this.accessControlEntry = ace; - this.zkPath = zkPath; - this.zkVersion = zkVersion; - } - - @Override - public int hashCode() { - return Objects.hashCode(zkPath, accessControlEntry); - } - - @Override - public boolean equals(Object obj) { - if (!(obj instanceof ZKAccessControl)) { - return false; - } - ZKAccessControl other = (ZKAccessControl) obj; - return Objects.equal(zkPath, other.zkPath) && - Objects.equal(accessControlEntry, other.accessControlEntry); - } - - @Override - public String toString() { - StringBuilder sb = new StringBuilder(); - sb.append("entry(path=").append(zkPath).append(", acl=") - .append(accessControlEntry).append(")"); - return sb.toString(); - } - - public String getZKPath() { - return zkPath; - } - - public AccessControlEntry getAccessControlEntry() { - return accessControlEntry; - } - - public Future<ZKAccessControl> create(ZooKeeperClient zkc) { - final Promise<ZKAccessControl> promise = new Promise<ZKAccessControl>(); - try { - zkc.get().create(zkPath, serialize(accessControlEntry), zkc.getDefaultACL(), CreateMode.PERSISTENT, - new AsyncCallback.StringCallback() { - @Override - public void processResult(int rc, String path, Object ctx, String name) { - if (KeeperException.Code.OK.intValue() == rc) { - ZKAccessControl.this.zkVersion = 0; - promise.setValue(ZKAccessControl.this); - } else { - promise.setException(KeeperException.create(KeeperException.Code.get(rc))); - } - } - }, null); - } catch (ZooKeeperClient.ZooKeeperConnectionException e) { - promise.setException(e); - } catch (InterruptedException e) { - promise.setException(e); - } catch (IOException e) { - promise.setException(e); - } - return promise; - } - - public Future<ZKAccessControl> update(ZooKeeperClient zkc) { - final Promise<ZKAccessControl> promise = new Promise<ZKAccessControl>(); - try { - zkc.get().setData(zkPath, serialize(accessControlEntry), zkVersion, new AsyncCallback.StatCallback() { - @Override - public void processResult(int rc, String path, Object ctx, Stat stat) { - if (KeeperException.Code.OK.intValue() == rc) { - ZKAccessControl.this.zkVersion = stat.getVersion(); - promise.setValue(ZKAccessControl.this); - } else { - promise.setException(KeeperException.create(KeeperException.Code.get(rc))); - } - } - }, null); - } catch (ZooKeeperClient.ZooKeeperConnectionException e) { - promise.setException(e); - } catch (InterruptedException e) { - promise.setException(e); - } catch (IOException e) { - promise.setException(e); - } - return promise; - } - - public static Future<ZKAccessControl> read(final ZooKeeperClient zkc, final String zkPath, Watcher watcher) { - final Promise<ZKAccessControl> promise = new Promise<ZKAccessControl>(); - - try { - zkc.get().getData(zkPath, watcher, new AsyncCallback.DataCallback() { - @Override - public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) { - if (KeeperException.Code.OK.intValue() == rc) { - try { - AccessControlEntry ace = deserialize(zkPath, data); - promise.setValue(new ZKAccessControl(ace, zkPath, stat.getVersion())); - } catch (IOException ioe) { - promise.setException(ioe); - } - } else { - promise.setException(KeeperException.create(KeeperException.Code.get(rc))); - } - } - }, null); - } catch (ZooKeeperClient.ZooKeeperConnectionException e) { - promise.setException(e); - } catch (InterruptedException e) { - promise.setException(e); - } - return promise; - } - - public static Future<Void> delete(final ZooKeeperClient zkc, final String zkPath) { - final Promise<Void> promise = new Promise<Void>(); - - try { - zkc.get().delete(zkPath, -1, new AsyncCallback.VoidCallback() { - @Override - public void processResult(int rc, String path, Object ctx) { - if (KeeperException.Code.OK.intValue() == rc || - KeeperException.Code.NONODE.intValue() == rc) { - promise.setValue(null); - } else { - promise.setException(KeeperException.create(KeeperException.Code.get(rc))); - } - } - }, null); - } catch (ZooKeeperClient.ZooKeeperConnectionException e) { - promise.setException(e); - } catch (InterruptedException e) { - promise.setException(e); - } - return promise; - } - - static byte[] serialize(AccessControlEntry ace) throws IOException { - TMemoryBuffer transport = new TMemoryBuffer(BUFFER_SIZE); - TJSONProtocol protocol = new TJSONProtocol(transport); - try { - ace.write(protocol); - transport.flush(); - return transport.toString(UTF_8.name()).getBytes(UTF_8); - } catch (TException e) { - throw new IOException("Failed to serialize access control entry : ", e); - } catch (UnsupportedEncodingException uee) { - throw new IOException("Failed to serialize acesss control entry : ", uee); - } - } - - static AccessControlEntry deserialize(String zkPath, byte[] data) throws IOException { - if (data.length == 0) { - return DEFAULT_ACCESS_CONTROL_ENTRY; - } - - AccessControlEntry ace = new AccessControlEntry(); - TMemoryInputTransport transport = new TMemoryInputTransport(data); - TJSONProtocol protocol = new TJSONProtocol(transport); - try { - ace.read(protocol); - } catch (TException e) { - throw new CorruptedAccessControlException(zkPath, e); - } - return ace; - } - -}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/cfc049cd/distributedlog-core/src/main/java/com/twitter/distributedlog/acl/ZKAccessControlManager.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/acl/ZKAccessControlManager.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/acl/ZKAccessControlManager.java deleted file mode 100644 index 9c89b4a..0000000 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/acl/ZKAccessControlManager.java +++ /dev/null @@ -1,373 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog.acl; - -import com.google.common.collect.Sets; -import com.twitter.distributedlog.DistributedLogConfiguration; -import com.twitter.distributedlog.ZooKeeperClient; -import com.twitter.distributedlog.exceptions.DLInterruptedException; -import com.twitter.distributedlog.thrift.AccessControlEntry; -import com.twitter.util.Await; -import com.twitter.util.Future; -import com.twitter.util.FutureEventListener; -import com.twitter.util.Promise; -import org.apache.bookkeeper.util.ZkUtils; -import org.apache.zookeeper.AsyncCallback; -import org.apache.zookeeper.CreateMode; -import org.apache.zookeeper.KeeperException; -import org.apache.zookeeper.WatchedEvent; -import org.apache.zookeeper.Watcher; -import org.apache.zookeeper.ZooKeeper; -import org.apache.zookeeper.data.Stat; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.util.HashSet; -import java.util.List; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; - -/** - * ZooKeeper Based {@link com.twitter.distributedlog.acl.AccessControlManager} - */ -public class ZKAccessControlManager implements AccessControlManager, Watcher { - - private static final Logger logger = LoggerFactory.getLogger(ZKAccessControlManager.class); - - private static final int ZK_RETRY_BACKOFF_MS = 500; - - protected final DistributedLogConfiguration conf; - protected final ZooKeeperClient zkc; - protected final String zkRootPath; - protected final ScheduledExecutorService scheduledExecutorService; - - protected final ConcurrentMap<String, ZKAccessControl> streamEntries; - protected ZKAccessControl defaultAccessControl; - protected volatile boolean closed = false; - - public ZKAccessControlManager(DistributedLogConfiguration conf, - ZooKeeperClient zkc, - String zkRootPath, - ScheduledExecutorService scheduledExecutorService) throws IOException { - this.conf = conf; - this.zkc = zkc; - this.zkRootPath = zkRootPath; - this.scheduledExecutorService = scheduledExecutorService; - this.streamEntries = new ConcurrentHashMap<String, ZKAccessControl>(); - try { - Await.result(fetchDefaultAccessControlEntry()); - } catch (Throwable t) { - if (t instanceof InterruptedException) { - throw new DLInterruptedException("Interrupted on getting default access control entry for " + zkRootPath, t); - } else if (t instanceof KeeperException) { - throw new IOException("Encountered zookeeper exception on getting default access control entry for " + zkRootPath, t); - } else if (t instanceof IOException) { - throw (IOException) t; - } else { - throw new IOException("Encountered unknown exception on getting access control entries for " + zkRootPath, t); - } - } - - try { - Await.result(fetchAccessControlEntries()); - } catch (Throwable t) { - if (t instanceof InterruptedException) { - throw new DLInterruptedException("Interrupted on getting access control entries for " + zkRootPath, t); - } else if (t instanceof KeeperException) { - throw new IOException("Encountered zookeeper exception on getting access control entries for " + zkRootPath, t); - } else if (t instanceof IOException) { - throw (IOException) t; - } else { - throw new IOException("Encountered unknown exception on getting access control entries for " + zkRootPath, t); - } - } - } - - protected AccessControlEntry getAccessControlEntry(String stream) { - ZKAccessControl entry = streamEntries.get(stream); - entry = null == entry ? defaultAccessControl : entry; - return entry.getAccessControlEntry(); - } - - @Override - public boolean allowWrite(String stream) { - return !getAccessControlEntry(stream).isDenyWrite(); - } - - @Override - public boolean allowTruncate(String stream) { - return !getAccessControlEntry(stream).isDenyTruncate(); - } - - @Override - public boolean allowDelete(String stream) { - return !getAccessControlEntry(stream).isDenyDelete(); - } - - @Override - public boolean allowAcquire(String stream) { - return !getAccessControlEntry(stream).isDenyAcquire(); - } - - @Override - public boolean allowRelease(String stream) { - return !getAccessControlEntry(stream).isDenyRelease(); - } - - @Override - public void close() { - closed = true; - } - - private Future<Void> fetchAccessControlEntries() { - final Promise<Void> promise = new Promise<Void>(); - fetchAccessControlEntries(promise); - return promise; - } - - private void fetchAccessControlEntries(final Promise<Void> promise) { - try { - zkc.get().getChildren(zkRootPath, this, new AsyncCallback.Children2Callback() { - @Override - public void processResult(int rc, String path, Object ctx, List<String> children, Stat stat) { - if (KeeperException.Code.OK.intValue() != rc) { - promise.setException(KeeperException.create(KeeperException.Code.get(rc))); - return; - } - Set<String> streamsReceived = new HashSet<String>(); - streamsReceived.addAll(children); - Set<String> streamsCached = streamEntries.keySet(); - Set<String> streamsRemoved = Sets.difference(streamsCached, streamsReceived).immutableCopy(); - for (String s : streamsRemoved) { - ZKAccessControl accessControl = streamEntries.remove(s); - if (null != accessControl) { - logger.info("Removed Access Control Entry for stream {} : {}", s, accessControl.getAccessControlEntry()); - } - } - if (streamsReceived.isEmpty()) { - promise.setValue(null); - return; - } - final AtomicInteger numPendings = new AtomicInteger(streamsReceived.size()); - final AtomicInteger numFailures = new AtomicInteger(0); - for (String s : streamsReceived) { - final String streamName = s; - ZKAccessControl.read(zkc, zkRootPath + "/" + streamName, null) - .addEventListener(new FutureEventListener<ZKAccessControl>() { - - @Override - public void onSuccess(ZKAccessControl accessControl) { - streamEntries.put(streamName, accessControl); - logger.info("Added overrided access control for stream {} : {}", streamName, accessControl.getAccessControlEntry()); - complete(); - } - - @Override - public void onFailure(Throwable cause) { - if (cause instanceof KeeperException.NoNodeException) { - streamEntries.remove(streamName); - } else if (cause instanceof ZKAccessControl.CorruptedAccessControlException) { - logger.warn("Access control is corrupted for stream {} @ {}, skipped it ...", - new Object[] { streamName, zkRootPath, cause }); - streamEntries.remove(streamName); - } else { - if (1 == numFailures.incrementAndGet()) { - promise.setException(cause); - } - } - complete(); - } - - private void complete() { - if (0 == numPendings.decrementAndGet() && numFailures.get() == 0) { - promise.setValue(null); - } - } - }); - } - } - }, null); - } catch (ZooKeeperClient.ZooKeeperConnectionException e) { - promise.setException(e); - } catch (InterruptedException e) { - promise.setException(e); - } - } - - private Future<ZKAccessControl> fetchDefaultAccessControlEntry() { - final Promise<ZKAccessControl> promise = new Promise<ZKAccessControl>(); - fetchDefaultAccessControlEntry(promise); - return promise; - } - - private void fetchDefaultAccessControlEntry(final Promise<ZKAccessControl> promise) { - ZKAccessControl.read(zkc, zkRootPath, this) - .addEventListener(new FutureEventListener<ZKAccessControl>() { - @Override - public void onSuccess(ZKAccessControl accessControl) { - logger.info("Default Access Control will be changed from {} to {}", - ZKAccessControlManager.this.defaultAccessControl, - accessControl); - ZKAccessControlManager.this.defaultAccessControl = accessControl; - promise.setValue(accessControl); - } - - @Override - public void onFailure(Throwable cause) { - if (cause instanceof KeeperException.NoNodeException) { - logger.info("Default Access Control is missing, creating one for {} ...", zkRootPath); - createDefaultAccessControlEntryIfNeeded(promise); - } else { - promise.setException(cause); - } - } - }); - } - - private void createDefaultAccessControlEntryIfNeeded(final Promise<ZKAccessControl> promise) { - ZooKeeper zk; - try { - zk = zkc.get(); - } catch (ZooKeeperClient.ZooKeeperConnectionException e) { - promise.setException(e); - return; - } catch (InterruptedException e) { - promise.setException(e); - return; - } - ZkUtils.asyncCreateFullPathOptimistic(zk, zkRootPath, new byte[0], zkc.getDefaultACL(), - CreateMode.PERSISTENT, new AsyncCallback.StringCallback() { - @Override - public void processResult(int rc, String path, Object ctx, String name) { - if (KeeperException.Code.OK.intValue() == rc) { - logger.info("Created zk path {} for default ACL.", zkRootPath); - fetchDefaultAccessControlEntry(promise); - } else { - promise.setException(KeeperException.create(KeeperException.Code.get(rc))); - } - } - }, null); - } - - private void refetchDefaultAccessControlEntry(final int delayMs) { - if (closed) { - return; - } - scheduledExecutorService.schedule(new Runnable() { - @Override - public void run() { - fetchDefaultAccessControlEntry().addEventListener(new FutureEventListener<ZKAccessControl>() { - @Override - public void onSuccess(ZKAccessControl value) { - // no-op - } - @Override - public void onFailure(Throwable cause) { - if (cause instanceof ZKAccessControl.CorruptedAccessControlException) { - logger.warn("Default access control entry is corrupted, ignore this update : ", cause); - return; - } - - logger.warn("Encountered an error on refetching default access control entry, retrying in {} ms : ", - ZK_RETRY_BACKOFF_MS, cause); - refetchDefaultAccessControlEntry(ZK_RETRY_BACKOFF_MS); - } - }); - } - }, delayMs, TimeUnit.MILLISECONDS); - } - - private void refetchAccessControlEntries(final int delayMs) { - if (closed) { - return; - } - scheduledExecutorService.schedule(new Runnable() { - @Override - public void run() { - fetchAccessControlEntries().addEventListener(new FutureEventListener<Void>() { - @Override - public void onSuccess(Void value) { - // no-op - } - @Override - public void onFailure(Throwable cause) { - logger.warn("Encountered an error on refetching access control entries, retrying in {} ms : ", - ZK_RETRY_BACKOFF_MS, cause); - refetchAccessControlEntries(ZK_RETRY_BACKOFF_MS); - } - }); - } - }, delayMs, TimeUnit.MILLISECONDS); - } - - private void refetchAllAccessControlEntries(final int delayMs) { - if (closed) { - return; - } - scheduledExecutorService.schedule(new Runnable() { - @Override - public void run() { - fetchDefaultAccessControlEntry().addEventListener(new FutureEventListener<ZKAccessControl>() { - @Override - public void onSuccess(ZKAccessControl value) { - fetchAccessControlEntries().addEventListener(new FutureEventListener<Void>() { - @Override - public void onSuccess(Void value) { - // no-op - } - - @Override - public void onFailure(Throwable cause) { - logger.warn("Encountered an error on fetching all access control entries, retrying in {} ms : ", - ZK_RETRY_BACKOFF_MS, cause); - refetchAccessControlEntries(ZK_RETRY_BACKOFF_MS); - } - }); - } - - @Override - public void onFailure(Throwable cause) { - logger.warn("Encountered an error on refetching all access control entries, retrying in {} ms : ", - ZK_RETRY_BACKOFF_MS, cause); - refetchAllAccessControlEntries(ZK_RETRY_BACKOFF_MS); - } - }); - } - }, delayMs, TimeUnit.MILLISECONDS); - } - - @Override - public void process(WatchedEvent event) { - if (Event.EventType.None.equals(event.getType())) { - if (event.getState() == Event.KeeperState.Expired) { - refetchAllAccessControlEntries(0); - } - } else if (Event.EventType.NodeDataChanged.equals(event.getType())) { - logger.info("Default ACL for {} is changed, refetching ...", zkRootPath); - refetchDefaultAccessControlEntry(0); - } else if (Event.EventType.NodeChildrenChanged.equals(event.getType())) { - logger.info("List of ACLs for {} are changed, refetching ...", zkRootPath); - refetchAccessControlEntries(0); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/cfc049cd/distributedlog-core/src/main/java/com/twitter/distributedlog/admin/DistributedLogAdmin.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/admin/DistributedLogAdmin.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/admin/DistributedLogAdmin.java index 0a3fdb0..0512907 100644 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/admin/DistributedLogAdmin.java +++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/admin/DistributedLogAdmin.java @@ -18,28 +18,27 @@ package com.twitter.distributedlog.admin; import com.google.common.base.Preconditions; -import com.twitter.distributedlog.BookKeeperClient; -import com.twitter.distributedlog.DistributedLogConfiguration; +import com.google.common.collect.Lists; import com.twitter.distributedlog.DistributedLogManager; import com.twitter.distributedlog.LogRecordWithDLSN; import com.twitter.distributedlog.LogSegmentMetadata; import com.twitter.distributedlog.ReadUtils; import com.twitter.distributedlog.ZooKeeperClient; import com.twitter.distributedlog.ZooKeeperClientBuilder; -import com.twitter.distributedlog.acl.ZKAccessControl; +import com.twitter.distributedlog.impl.BKNamespaceDriver; +import com.twitter.distributedlog.impl.acl.ZKAccessControl; import com.twitter.distributedlog.exceptions.DLIllegalStateException; import com.twitter.distributedlog.impl.federated.FederatedZKLogMetadataStore; -import com.twitter.distributedlog.impl.logsegment.BKLogSegmentEntryStore; -import com.twitter.distributedlog.injector.AsyncFailureInjector; import com.twitter.distributedlog.logsegment.LogSegmentEntryStore; -import com.twitter.distributedlog.metadata.BKDLConfig; +import com.twitter.distributedlog.impl.metadata.BKDLConfig; import com.twitter.distributedlog.metadata.DLMetadata; import com.twitter.distributedlog.metadata.DryrunLogSegmentMetadataStoreUpdater; import com.twitter.distributedlog.metadata.MetadataUpdater; import com.twitter.distributedlog.metadata.LogSegmentMetadataStoreUpdater; +import com.twitter.distributedlog.namespace.DistributedLogNamespace; +import com.twitter.distributedlog.namespace.NamespaceDriver; import com.twitter.distributedlog.thrift.AccessControlEntry; import com.twitter.distributedlog.tools.DistributedLogTool; -import com.twitter.distributedlog.util.DLUtils; import com.twitter.distributedlog.util.FutureUtils; import com.twitter.distributedlog.util.OrderedScheduler; import com.twitter.distributedlog.util.SchedulerUtils; @@ -61,6 +60,7 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.Comparator; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.SortedSet; @@ -76,7 +76,6 @@ import java.util.concurrent.atomic.AtomicInteger; /** * Admin Tool for DistributedLog. */ -@SuppressWarnings("deprecation") public class DistributedLogAdmin extends DistributedLogTool { static final Logger LOG = LoggerFactory.getLogger(DistributedLogAdmin.class); @@ -84,8 +83,8 @@ public class DistributedLogAdmin extends DistributedLogTool { /** * Fix inprogress segment with lower ledger sequence number. * - * @param factory - * dlm factory. + * @param namespace + * dl namespace * @param metadataUpdater * metadata updater. * @param streamName @@ -96,12 +95,12 @@ public class DistributedLogAdmin extends DistributedLogTool { * is confirmation needed before executing actual action. * @throws IOException */ - public static void fixInprogressSegmentWithLowerSequenceNumber(final com.twitter.distributedlog.DistributedLogManagerFactory factory, + public static void fixInprogressSegmentWithLowerSequenceNumber(final DistributedLogNamespace namespace, final MetadataUpdater metadataUpdater, final String streamName, final boolean verbose, final boolean interactive) throws IOException { - DistributedLogManager dlm = factory.createDistributedLogManagerWithSharedClients(streamName); + DistributedLogManager dlm = namespace.openLog(streamName); try { List<LogSegmentMetadata> segments = dlm.getLogSegments(); if (verbose) { @@ -194,37 +193,37 @@ public class DistributedLogAdmin extends DistributedLogTool { } public static void checkAndRepairDLNamespace(final URI uri, - final com.twitter.distributedlog.DistributedLogManagerFactory factory, + final DistributedLogNamespace namespace, final MetadataUpdater metadataUpdater, final OrderedScheduler scheduler, - final BookKeeperClient bkc, - final String digestpw, final boolean verbose, final boolean interactive) throws IOException { - checkAndRepairDLNamespace(uri, factory, metadataUpdater, scheduler, bkc, digestpw, verbose, interactive, 1); + checkAndRepairDLNamespace(uri, namespace, metadataUpdater, scheduler, verbose, interactive, 1); } public static void checkAndRepairDLNamespace(final URI uri, - final com.twitter.distributedlog.DistributedLogManagerFactory factory, + final DistributedLogNamespace namespace, final MetadataUpdater metadataUpdater, final OrderedScheduler scheduler, - final BookKeeperClient bkc, - final String digestpw, final boolean verbose, final boolean interactive, final int concurrency) throws IOException { Preconditions.checkArgument(concurrency > 0, "Invalid concurrency " + concurrency + " found."); // 0. getting streams under a given uri. - Collection<String> streams = factory.enumerateAllLogsInNamespace(); + Iterator<String> streamsIter = namespace.getLogs(); + List<String> streams = Lists.newArrayList(); + while (streamsIter.hasNext()) { + streams.add(streamsIter.next()); + } if (verbose) { - System.out.println("- 0. checking " + streams.size() + " streams under " + uri); + System.out.println("- 0. checking streams under " + uri); } if (streams.size() == 0) { System.out.println("+ 0. nothing to check. quit."); return; } Map<String, StreamCandidate> streamCandidates = - checkStreams(factory, streams, scheduler, bkc, digestpw, concurrency); + checkStreams(namespace, streams, scheduler, concurrency); if (verbose) { System.out.println("+ 0. " + streamCandidates.size() + " corrupted streams found."); } @@ -248,11 +247,9 @@ public class DistributedLogAdmin extends DistributedLogTool { } private static Map<String, StreamCandidate> checkStreams( - final com.twitter.distributedlog.DistributedLogManagerFactory factory, + final DistributedLogNamespace namespace, final Collection<String> streams, final OrderedScheduler scheduler, - final BookKeeperClient bkc, - final String digestpw, final int concurrency) throws IOException { final LinkedBlockingQueue<String> streamQueue = new LinkedBlockingQueue<String>(); @@ -275,7 +272,7 @@ public class DistributedLogAdmin extends DistributedLogTool { StreamCandidate candidate; try { LOG.info("Checking stream {}.", stream); - candidate = checkStream(factory, stream, scheduler, bkc, digestpw); + candidate = checkStream(namespace, stream, scheduler); LOG.info("Checked stream {} - {}.", stream, candidate); } catch (IOException e) { LOG.error("Error on checking stream {} : ", stream, e); @@ -316,12 +313,10 @@ public class DistributedLogAdmin extends DistributedLogTool { } private static StreamCandidate checkStream( - final com.twitter.distributedlog.DistributedLogManagerFactory factory, + final DistributedLogNamespace namespace, final String streamName, - final OrderedScheduler scheduler, - final BookKeeperClient bkc, - String digestpw) throws IOException { - DistributedLogManager dlm = factory.createDistributedLogManagerWithSharedClients(streamName); + final OrderedScheduler scheduler) throws IOException { + DistributedLogManager dlm = namespace.openLog(streamName); try { List<LogSegmentMetadata> segments = dlm.getLogSegments(); if (segments.isEmpty()) { @@ -330,7 +325,7 @@ public class DistributedLogAdmin extends DistributedLogTool { List<Future<LogSegmentCandidate>> futures = new ArrayList<Future<LogSegmentCandidate>>(segments.size()); for (LogSegmentMetadata segment : segments) { - futures.add(checkLogSegment(streamName, segment, scheduler, bkc, digestpw)); + futures.add(checkLogSegment(namespace, streamName, segment, scheduler)); } List<LogSegmentCandidate> segmentCandidates; try { @@ -354,21 +349,16 @@ public class DistributedLogAdmin extends DistributedLogTool { } private static Future<LogSegmentCandidate> checkLogSegment( + final DistributedLogNamespace namespace, final String streamName, final LogSegmentMetadata metadata, - final OrderedScheduler scheduler, - final BookKeeperClient bkc, - final String digestpw) { + final OrderedScheduler scheduler) { if (metadata.isInProgress()) { return Future.value(null); } - final LogSegmentEntryStore entryStore = new BKLogSegmentEntryStore( - new DistributedLogConfiguration().setBKDigestPW(digestpw), - bkc, - scheduler, - NullStatsLogger.INSTANCE, - AsyncFailureInjector.NULL); + final LogSegmentEntryStore entryStore = namespace.getNamespaceDriver() + .getLogSegmentEntryStore(NamespaceDriver.Role.READER); return ReadUtils.asyncReadLastRecord( streamName, metadata, @@ -432,6 +422,8 @@ public class DistributedLogAdmin extends DistributedLogTool { /** * Unbind the bookkeeper environment for a given distributedlog uri. + * + * TODO: move unbind operation to namespace driver */ class UnbindCommand extends OptsCommand { @@ -491,6 +483,8 @@ public class DistributedLogAdmin extends DistributedLogTool { /** * Bind Command to bind bookkeeper environment for a given distributed uri. + * + * TODO: move bind to namespace driver */ class BindCommand extends OptsCommand { @@ -559,7 +553,7 @@ public class DistributedLogAdmin extends DistributedLogTool { if (cmdline.hasOption("dlzw")) { dlZkServersForWriter = cmdline.getOptionValue("dlzw"); } else { - dlZkServersForWriter = DLUtils.getZKServersFromDLUri(uri); + dlZkServersForWriter = BKNamespaceDriver.getZKServersFromDLUri(uri); } if (cmdline.hasOption("dlzr")) { dlZkServersForReader = cmdline.getOptionValue("dlzr"); @@ -689,7 +683,7 @@ public class DistributedLogAdmin extends DistributedLogTool { return -1; } for (String stream : streams) { - fixInprogressSegmentWithLowerSequenceNumber(getFactory(), metadataUpdater, stream, verbose, !getForce()); + fixInprogressSegmentWithLowerSequenceNumber(getNamespace(), metadataUpdater, stream, verbose, !getForce()); } return 0; } @@ -739,10 +733,9 @@ public class DistributedLogAdmin extends DistributedLogTool { .corePoolSize(Runtime.getRuntime().availableProcessors()) .build(); ExecutorService executorService = Executors.newCachedThreadPool(); - BookKeeperClient bkc = getBookKeeperClient(); try { - checkAndRepairDLNamespace(getUri(), getFactory(), metadataUpdater, scheduler, - bkc, getConf().getBKDigestPW(), verbose, !getForce(), concurrency); + checkAndRepairDLNamespace(getUri(), getNamespace(), metadataUpdater, scheduler, + verbose, !getForce(), concurrency); } finally { SchedulerUtils.shutdownScheduler(executorService, 5, TimeUnit.MINUTES); } http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/cfc049cd/distributedlog-core/src/main/java/com/twitter/distributedlog/auditor/DLAuditor.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/auditor/DLAuditor.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/auditor/DLAuditor.java index e551c22..a081606 100644 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/auditor/DLAuditor.java +++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/auditor/DLAuditor.java @@ -21,18 +21,20 @@ import com.google.common.base.Objects; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import com.google.common.util.concurrent.SettableFuture; -import com.twitter.distributedlog.BKDistributedLogNamespace; import com.twitter.distributedlog.BookKeeperClient; import com.twitter.distributedlog.BookKeeperClientBuilder; import com.twitter.distributedlog.DistributedLogConfiguration; import com.twitter.distributedlog.DistributedLogManager; import com.twitter.distributedlog.LogSegmentMetadata; +import com.twitter.distributedlog.impl.BKNamespaceDriver; import com.twitter.distributedlog.namespace.DistributedLogNamespace; import com.twitter.distributedlog.ZooKeeperClient; import com.twitter.distributedlog.ZooKeeperClientBuilder; import com.twitter.distributedlog.exceptions.DLInterruptedException; import com.twitter.distributedlog.exceptions.ZKException; -import com.twitter.distributedlog.metadata.BKDLConfig; +import com.twitter.distributedlog.impl.metadata.BKDLConfig; +import com.twitter.distributedlog.namespace.DistributedLogNamespaceBuilder; +import com.twitter.distributedlog.namespace.NamespaceDriver; import com.twitter.distributedlog.util.DLUtils; import org.apache.bookkeeper.client.BKException; import org.apache.bookkeeper.client.BookKeeper; @@ -52,8 +54,8 @@ import org.slf4j.LoggerFactory; import java.io.IOException; import java.net.URI; import java.util.ArrayList; -import java.util.Collection; import java.util.HashSet; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; @@ -72,7 +74,6 @@ import static com.google.common.base.Charsets.UTF_8; /** * DL Auditor will audit DL namespace, e.g. find leaked ledger, report disk usage by streams. */ -@SuppressWarnings("deprecation") public class DLAuditor { private static final Logger logger = LoggerFactory.getLogger(DLAuditor.class); @@ -83,23 +84,23 @@ public class DLAuditor { this.conf = conf; } - private ZooKeeperClient getZooKeeperClient(com.twitter.distributedlog.DistributedLogManagerFactory factory) { - DistributedLogNamespace namespace = factory.getNamespace(); - assert(namespace instanceof BKDistributedLogNamespace); - return ((BKDistributedLogNamespace) namespace).getSharedWriterZKCForDL(); + private ZooKeeperClient getZooKeeperClient(DistributedLogNamespace namespace) { + NamespaceDriver driver = namespace.getNamespaceDriver(); + assert(driver instanceof BKNamespaceDriver); + return ((BKNamespaceDriver) driver).getWriterZKC(); } - private BookKeeperClient getBookKeeperClient(com.twitter.distributedlog.DistributedLogManagerFactory factory) { - DistributedLogNamespace namespace = factory.getNamespace(); - assert(namespace instanceof BKDistributedLogNamespace); - return ((BKDistributedLogNamespace) namespace).getReaderBKC(); + private BookKeeperClient getBookKeeperClient(DistributedLogNamespace namespace) { + NamespaceDriver driver = namespace.getNamespaceDriver(); + assert(driver instanceof BKNamespaceDriver); + return ((BKNamespaceDriver) driver).getReaderBKC(); } private String validateAndGetZKServers(List<URI> uris) { URI firstURI = uris.get(0); - String zkServers = DLUtils.getZKServersFromDLUri(firstURI); + String zkServers = BKNamespaceDriver.getZKServersFromDLUri(firstURI); for (URI uri : uris) { - if (!zkServers.equalsIgnoreCase(DLUtils.getZKServersFromDLUri(uri))) { + if (!zkServers.equalsIgnoreCase(BKNamespaceDriver.getZKServersFromDLUri(uri))) { throw new IllegalArgumentException("Uris don't belong to same zookeeper cluster"); } } @@ -224,19 +225,23 @@ public class DLAuditor { private Set<Long> collectLedgersFromDL(List<URI> uris, List<List<String>> allocationPaths) throws IOException { final Set<Long> ledgers = new TreeSet<Long>(); - List<com.twitter.distributedlog.DistributedLogManagerFactory> factories = - new ArrayList<com.twitter.distributedlog.DistributedLogManagerFactory>(uris.size()); + List<DistributedLogNamespace> namespaces = + new ArrayList<DistributedLogNamespace>(uris.size()); try { for (URI uri : uris) { - factories.add(new com.twitter.distributedlog.DistributedLogManagerFactory(conf, uri)); + namespaces.add( + DistributedLogNamespaceBuilder.newBuilder() + .conf(conf) + .uri(uri) + .build()); } final CountDownLatch doneLatch = new CountDownLatch(uris.size()); final AtomicInteger numFailures = new AtomicInteger(0); ExecutorService executor = Executors.newFixedThreadPool(uris.size()); try { int i = 0; - for (com.twitter.distributedlog.DistributedLogManagerFactory factory : factories) { - final com.twitter.distributedlog.DistributedLogManagerFactory dlFactory = factory; + for (final DistributedLogNamespace namespace : namespaces) { + final DistributedLogNamespace dlNamespace = namespace; final URI uri = uris.get(i); final List<String> aps = allocationPaths.get(i); i++; @@ -245,12 +250,12 @@ public class DLAuditor { public void run() { try { logger.info("Collecting ledgers from {} : {}", uri, aps); - collectLedgersFromAllocator(uri, dlFactory, aps, ledgers); + collectLedgersFromAllocator(uri, namespace, aps, ledgers); synchronized (ledgers) { logger.info("Collected {} ledgers from allocators for {} : {} ", new Object[]{ledgers.size(), uri, ledgers}); } - collectLedgersFromDL(uri, dlFactory, ledgers); + collectLedgersFromDL(uri, namespace, ledgers); } catch (IOException e) { numFailures.incrementAndGet(); logger.info("Error to collect ledgers from DL : ", e); @@ -273,15 +278,15 @@ public class DLAuditor { executor.shutdown(); } } finally { - for (com.twitter.distributedlog.DistributedLogManagerFactory factory : factories) { - factory.close(); + for (DistributedLogNamespace namespace : namespaces) { + namespace.close(); } } return ledgers; } private void collectLedgersFromAllocator(final URI uri, - final com.twitter.distributedlog.DistributedLogManagerFactory factory, + final DistributedLogNamespace namespace, final List<String> allocationPaths, final Set<Long> ledgers) throws IOException { final LinkedBlockingQueue<String> poolQueue = @@ -289,7 +294,7 @@ public class DLAuditor { for (String allocationPath : allocationPaths) { String rootPath = uri.getPath() + "/" + allocationPath; try { - List<String> pools = getZooKeeperClient(factory).get().getChildren(rootPath, false); + List<String> pools = getZooKeeperClient(namespace).get().getChildren(rootPath, false); for (String pool : pools) { poolQueue.add(rootPath + "/" + pool); } @@ -318,11 +323,11 @@ public class DLAuditor { private void collectLedgersFromPool(String poolPath) throws InterruptedException, ZooKeeperClient.ZooKeeperConnectionException, KeeperException { - List<String> allocators = getZooKeeperClient(factory).get() + List<String> allocators = getZooKeeperClient(namespace).get() .getChildren(poolPath, false); for (String allocator : allocators) { String allocatorPath = poolPath + "/" + allocator; - byte[] data = getZooKeeperClient(factory).get().getData(allocatorPath, false, new Stat()); + byte[] data = getZooKeeperClient(namespace).get().getData(allocatorPath, false, new Stat()); if (null != data && data.length > 0) { try { long ledgerId = DLUtils.bytes2LogSegmentId(data); @@ -341,30 +346,31 @@ public class DLAuditor { } private void collectLedgersFromDL(final URI uri, - final com.twitter.distributedlog.DistributedLogManagerFactory factory, + final DistributedLogNamespace namespace, final Set<Long> ledgers) throws IOException { logger.info("Enumerating {} to collect streams.", uri); - Collection<String> streams = factory.enumerateAllLogsInNamespace(); + Iterator<String> streams = namespace.getLogs(); final LinkedBlockingQueue<String> streamQueue = new LinkedBlockingQueue<String>(); - streamQueue.addAll(streams); + while (streams.hasNext()) { + streamQueue.add(streams.next()); + } logger.info("Collected {} streams from uri {} : {}", - new Object[] { streams.size(), uri, streams }); + new Object[] { streamQueue.size(), uri, streams }); executeAction(streamQueue, 10, new Action<String>() { @Override public void execute(String stream) throws IOException { - collectLedgersFromStream(factory, stream, ledgers); + collectLedgersFromStream(namespace, stream, ledgers); } }); } - private List<Long> collectLedgersFromStream(com.twitter.distributedlog.DistributedLogManagerFactory factory, + private List<Long> collectLedgersFromStream(DistributedLogNamespace namespace, String stream, Set<Long> ledgers) throws IOException { - DistributedLogManager dlm = factory.createDistributedLogManager(stream, - com.twitter.distributedlog.DistributedLogManagerFactory.ClientSharingOption.SharedClients); + DistributedLogManager dlm = namespace.openLog(stream); try { List<LogSegmentMetadata> segments = dlm.getLogSegments(); List<Long> sLedgers = new ArrayList<Long>(); @@ -388,21 +394,25 @@ public class DLAuditor { */ public Map<String, Long> calculateStreamSpaceUsage(final URI uri) throws IOException { logger.info("Collecting stream space usage for {}.", uri); - com.twitter.distributedlog.DistributedLogManagerFactory factory = - new com.twitter.distributedlog.DistributedLogManagerFactory(conf, uri); + DistributedLogNamespace namespace = DistributedLogNamespaceBuilder.newBuilder() + .conf(conf) + .uri(uri) + .build(); try { - return calculateStreamSpaceUsage(uri, factory); + return calculateStreamSpaceUsage(uri, namespace); } finally { - factory.close(); + namespace.close(); } } private Map<String, Long> calculateStreamSpaceUsage( - final URI uri, final com.twitter.distributedlog.DistributedLogManagerFactory factory) + final URI uri, final DistributedLogNamespace namespace) throws IOException { - Collection<String> streams = factory.enumerateAllLogsInNamespace(); + Iterator<String> streams = namespace.getLogs(); final LinkedBlockingQueue<String> streamQueue = new LinkedBlockingQueue<String>(); - streamQueue.addAll(streams); + while (streams.hasNext()) { + streamQueue.add(streams.next()); + } final Map<String, Long> streamSpaceUsageMap = new ConcurrentSkipListMap<String, Long>(); @@ -412,7 +422,7 @@ public class DLAuditor { @Override public void execute(String stream) throws IOException { streamSpaceUsageMap.put(stream, - calculateStreamSpaceUsage(factory, stream)); + calculateStreamSpaceUsage(namespace, stream)); if (numStreamsCollected.incrementAndGet() % 1000 == 0) { logger.info("Calculated {} streams from uri {}.", numStreamsCollected.get(), uri); } @@ -422,16 +432,15 @@ public class DLAuditor { return streamSpaceUsageMap; } - private long calculateStreamSpaceUsage(final com.twitter.distributedlog.DistributedLogManagerFactory factory, + private long calculateStreamSpaceUsage(final DistributedLogNamespace namespace, final String stream) throws IOException { - DistributedLogManager dlm = factory.createDistributedLogManager(stream, - com.twitter.distributedlog.DistributedLogManagerFactory.ClientSharingOption.SharedClients); + DistributedLogManager dlm = namespace.openLog(stream); long totalBytes = 0; try { List<LogSegmentMetadata> segments = dlm.getLogSegments(); for (LogSegmentMetadata segment : segments) { try { - LedgerHandle lh = getBookKeeperClient(factory).get().openLedgerNoRecovery(segment.getLogSegmentId(), + LedgerHandle lh = getBookKeeperClient(namespace).get().openLedgerNoRecovery(segment.getLogSegmentId(), BookKeeper.DigestType.CRC32, conf.getBKDigestPW().getBytes(UTF_8)); totalBytes += lh.getLength(); lh.close(); http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/cfc049cd/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/BKDLUtils.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/BKDLUtils.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/BKDLUtils.java deleted file mode 100644 index dd78a4e..0000000 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/BKDLUtils.java +++ /dev/null @@ -1,100 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog.impl; - -import com.twitter.distributedlog.DistributedLogConfiguration; -import com.twitter.distributedlog.exceptions.InvalidStreamNameException; - -import java.net.URI; - -/** - * Utils for bookkeeper based distributedlog implementation - */ -public class BKDLUtils { - - /** - * Is it a reserved stream name in bkdl namespace? - * - * @param name - * stream name - * @return true if it is reserved name, otherwise false. - */ - public static boolean isReservedStreamName(String name) { - return name.startsWith("."); - } - - /** - * Validate the configuration and uri. - * - * @param conf - * distributedlog configuration - * @param uri - * distributedlog uri - * @throws IllegalArgumentException - */ - public static void validateConfAndURI(DistributedLogConfiguration conf, URI uri) - throws IllegalArgumentException { - if (null == conf) { - throw new IllegalArgumentException("Incorrect Configuration"); - } else { - conf.validate(); - } - if ((null == uri) || (null == uri.getAuthority()) || (null == uri.getPath())) { - throw new IllegalArgumentException("Incorrect ZK URI"); - } - } - - /** - * Validate the stream name. - * - * @param nameOfStream - * name of stream - * @throws InvalidStreamNameException - */ - public static void validateName(String nameOfStream) - throws InvalidStreamNameException { - String reason = null; - char chars[] = nameOfStream.toCharArray(); - char c; - // validate the stream to see if meet zookeeper path's requirement - for (int i = 0; i < chars.length; i++) { - c = chars[i]; - - if (c == 0) { - reason = "null character not allowed @" + i; - break; - } else if (c == '/') { - reason = "'/' not allowed @" + i; - break; - } else if (c > '\u0000' && c < '\u001f' - || c > '\u007f' && c < '\u009F' - || c > '\ud800' && c < '\uf8ff' - || c > '\ufff0' && c < '\uffff') { - reason = "invalid charater @" + i; - break; - } - } - if (null != reason) { - throw new InvalidStreamNameException(nameOfStream, reason); - } - if (isReservedStreamName(nameOfStream)) { - throw new InvalidStreamNameException(nameOfStream, - "Stream Name is reserved"); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/cfc049cd/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/BKNamespaceDriver.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/BKNamespaceDriver.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/BKNamespaceDriver.java new file mode 100644 index 0000000..5921233 --- /dev/null +++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/BKNamespaceDriver.java @@ -0,0 +1,631 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.twitter.distributedlog.impl; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Optional; +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import com.twitter.distributedlog.BookKeeperClient; +import com.twitter.distributedlog.BookKeeperClientBuilder; +import com.twitter.distributedlog.DistributedLogConfiguration; +import com.twitter.distributedlog.DistributedLogConstants; +import com.twitter.distributedlog.MetadataAccessor; +import com.twitter.distributedlog.ZooKeeperClient; +import com.twitter.distributedlog.ZooKeeperClientBuilder; +import com.twitter.distributedlog.acl.AccessControlManager; +import com.twitter.distributedlog.acl.DefaultAccessControlManager; +import com.twitter.distributedlog.impl.acl.ZKAccessControlManager; +import com.twitter.distributedlog.bk.LedgerAllocator; +import com.twitter.distributedlog.bk.LedgerAllocatorUtils; +import com.twitter.distributedlog.config.DynamicDistributedLogConfiguration; +import com.twitter.distributedlog.exceptions.AlreadyClosedException; +import com.twitter.distributedlog.exceptions.InvalidStreamNameException; +import com.twitter.distributedlog.impl.federated.FederatedZKLogMetadataStore; +import com.twitter.distributedlog.impl.logsegment.BKLogSegmentEntryStore; +import com.twitter.distributedlog.impl.metadata.ZKLogStreamMetadataStore; +import com.twitter.distributedlog.impl.subscription.ZKSubscriptionsStore; +import com.twitter.distributedlog.injector.AsyncFailureInjector; +import com.twitter.distributedlog.logsegment.LogSegmentEntryStore; +import com.twitter.distributedlog.impl.metadata.BKDLConfig; +import com.twitter.distributedlog.metadata.LogMetadataForReader; +import com.twitter.distributedlog.metadata.LogMetadataStore; +import com.twitter.distributedlog.metadata.LogStreamMetadataStore; +import com.twitter.distributedlog.namespace.NamespaceDriver; +import com.twitter.distributedlog.namespace.NamespaceDriverManager; +import com.twitter.distributedlog.subscription.SubscriptionsStore; +import com.twitter.distributedlog.util.OrderedScheduler; +import com.twitter.distributedlog.util.Utils; +import org.apache.bookkeeper.feature.FeatureProvider; +import org.apache.bookkeeper.stats.StatsLogger; +import org.apache.bookkeeper.zookeeper.BoundExponentialBackoffRetryPolicy; +import org.apache.bookkeeper.zookeeper.RetryPolicy; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.ZooKeeper; +import org.apache.zookeeper.common.PathUtils; +import org.apache.zookeeper.data.Stat; +import org.jboss.netty.channel.socket.ClientSocketChannelFactory; +import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory; +import org.jboss.netty.util.HashedWheelTimer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.net.URI; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import static com.twitter.distributedlog.util.DLUtils.isReservedStreamName; +import static com.twitter.distributedlog.util.DLUtils.validateName; + +/** + * Manager for ZooKeeper/BookKeeper based namespace + */ +public class BKNamespaceDriver implements NamespaceDriver { + + private static Logger LOG = LoggerFactory.getLogger(BKNamespaceDriver.class); + + // register itself + static { + NamespaceDriverManager.registerDriver(DistributedLogConstants.BACKEND_BK, BKNamespaceDriver.class); + } + + /** + * Extract zk servers fro dl <i>namespace</i>. + * + * @param uri dl namespace + * @return zk servers + */ + public static String getZKServersFromDLUri(URI uri) { + return uri.getAuthority().replace(";", ","); + } + + // resources (passed from initialization) + private DistributedLogConfiguration conf; + private DynamicDistributedLogConfiguration dynConf; + private URI namespace; + private OrderedScheduler scheduler; + private FeatureProvider featureProvider; + private AsyncFailureInjector failureInjector; + private StatsLogger statsLogger; + private StatsLogger perLogStatsLogger; + private String clientId; + private int regionId; + + // + // resources (created internally and initialized at #initialize()) + // + + // namespace binding + private BKDLConfig bkdlConfig; + + // zookeeper clients + // NOTE: The actual zookeeper client is initialized lazily when it is referenced by + // {@link com.twitter.distributedlog.ZooKeeperClient#get()}. So it is safe to + // keep builders and their client wrappers here, as they will be used when + // instantiating readers or writers. + private ZooKeeperClientBuilder sharedWriterZKCBuilder; + private ZooKeeperClient writerZKC; + private ZooKeeperClientBuilder sharedReaderZKCBuilder; + private ZooKeeperClient readerZKC; + // NOTE: The actual bookkeeper client is initialized lazily when it is referenced by + // {@link com.twitter.distributedlog.BookKeeperClient#get()}. So it is safe to + // keep builders and their client wrappers here, as they will be used when + // instantiating readers or writers. + private ClientSocketChannelFactory channelFactory; + private HashedWheelTimer requestTimer; + private BookKeeperClientBuilder sharedWriterBKCBuilder; + private BookKeeperClient writerBKC; + private BookKeeperClientBuilder sharedReaderBKCBuilder; + private BookKeeperClient readerBKC; + + // log stream metadata store + private LogMetadataStore metadataStore; + private LogStreamMetadataStore writerStreamMetadataStore; + private LogStreamMetadataStore readerStreamMetadataStore; + + // + // resources (lazily initialized) + // + + // ledger allocator + private LedgerAllocator allocator; + + // log segment entry stores + private LogSegmentEntryStore writerEntryStore; + private LogSegmentEntryStore readerEntryStore; + + // access control manager + private AccessControlManager accessControlManager; + + // + // states + // + protected boolean initialized = false; + protected AtomicBoolean closed = new AtomicBoolean(false); + + /** + * Public constructor for reflection. + */ + public BKNamespaceDriver() { + } + + @Override + public synchronized NamespaceDriver initialize(DistributedLogConfiguration conf, + DynamicDistributedLogConfiguration dynConf, + URI namespace, + OrderedScheduler scheduler, + FeatureProvider featureProvider, + AsyncFailureInjector failureInjector, + StatsLogger statsLogger, + StatsLogger perLogStatsLogger, + String clientId, + int regionId) throws IOException { + if (initialized) { + return this; + } + // validate the namespace + if ((null == namespace) || (null == namespace.getAuthority()) || (null == namespace.getPath())) { + throw new IOException("Incorrect distributedlog namespace : " + namespace); + } + + // initialize the resources + this.conf = conf; + this.dynConf = dynConf; + this.namespace = namespace; + this.scheduler = scheduler; + this.featureProvider = featureProvider; + this.failureInjector = failureInjector; + this.statsLogger = statsLogger; + this.perLogStatsLogger = perLogStatsLogger; + this.clientId = clientId; + this.regionId = regionId; + + // initialize the zookeeper clients + initializeZooKeeperClients(); + + // initialize the bookkeeper clients + initializeBookKeeperClients(); + + // propagate bkdlConfig to configuration + BKDLConfig.propagateConfiguration(bkdlConfig, conf); + + // initialize the log metadata & stream metadata store + initializeLogStreamMetadataStores(); + + // initialize other resources + initializeOtherResources(); + + initialized = true; + + LOG.info("Initialized BK namespace driver: clientId = {}, regionId = {}, federated = {}.", + new Object[]{clientId, regionId, bkdlConfig.isFederatedNamespace()}); + return this; + } + + private void initializeZooKeeperClients() throws IOException { + // Build the namespace zookeeper client + this.sharedWriterZKCBuilder = createZKClientBuilder( + String.format("dlzk:%s:factory_writer_shared", namespace), + conf, + getZKServersFromDLUri(namespace), + statsLogger.scope("dlzk_factory_writer_shared")); + this.writerZKC = sharedWriterZKCBuilder.build(); + + // Resolve namespace binding + this.bkdlConfig = BKDLConfig.resolveDLConfig(writerZKC, namespace); + + // Build zookeeper client for readers + if (bkdlConfig.getDlZkServersForWriter().equals(bkdlConfig.getDlZkServersForReader())) { + this.sharedReaderZKCBuilder = this.sharedWriterZKCBuilder; + } else { + this.sharedReaderZKCBuilder = createZKClientBuilder( + String.format("dlzk:%s:factory_reader_shared", namespace), + conf, + bkdlConfig.getDlZkServersForReader(), + statsLogger.scope("dlzk_factory_reader_shared")); + } + this.readerZKC = this.sharedReaderZKCBuilder.build(); + } + + private synchronized BKDLConfig getBkdlConfig() { + return bkdlConfig; + } + + private void initializeBookKeeperClients() throws IOException { + this.channelFactory = new NioClientSocketChannelFactory( + Executors.newCachedThreadPool(new ThreadFactoryBuilder().setNameFormat("DL-netty-boss-%d").build()), + Executors.newCachedThreadPool(new ThreadFactoryBuilder().setNameFormat("DL-netty-worker-%d").build()), + conf.getBKClientNumberIOThreads()); + this.requestTimer = new HashedWheelTimer( + new ThreadFactoryBuilder().setNameFormat("DLFactoryTimer-%d").build(), + conf.getTimeoutTimerTickDurationMs(), TimeUnit.MILLISECONDS, + conf.getTimeoutTimerNumTicks()); + // Build bookkeeper client for writers + this.sharedWriterBKCBuilder = createBKCBuilder( + String.format("bk:%s:factory_writer_shared", namespace), + conf, + bkdlConfig.getBkZkServersForWriter(), + bkdlConfig.getBkLedgersPath(), + channelFactory, + requestTimer, + Optional.of(featureProvider.scope("bkc")), + statsLogger); + this.writerBKC = this.sharedWriterBKCBuilder.build(); + + // Build bookkeeper client for readers + if (bkdlConfig.getBkZkServersForWriter().equals(bkdlConfig.getBkZkServersForReader())) { + this.sharedReaderBKCBuilder = this.sharedWriterBKCBuilder; + } else { + this.sharedReaderBKCBuilder = createBKCBuilder( + String.format("bk:%s:factory_reader_shared", namespace), + conf, + bkdlConfig.getBkZkServersForReader(), + bkdlConfig.getBkLedgersPath(), + channelFactory, + requestTimer, + Optional.<FeatureProvider>absent(), + statsLogger); + } + this.readerBKC = this.sharedReaderBKCBuilder.build(); + } + + private void initializeLogStreamMetadataStores() throws IOException { + // log metadata store + if (bkdlConfig.isFederatedNamespace() || conf.isFederatedNamespaceEnabled()) { + this.metadataStore = new FederatedZKLogMetadataStore(conf, namespace, readerZKC, scheduler); + } else { + this.metadataStore = new ZKLogMetadataStore(conf, namespace, readerZKC, scheduler); + } + + // create log stream metadata store + this.writerStreamMetadataStore = + new ZKLogStreamMetadataStore( + clientId, + conf, + writerZKC, + scheduler, + statsLogger); + this.readerStreamMetadataStore = + new ZKLogStreamMetadataStore( + clientId, + conf, + readerZKC, + scheduler, + statsLogger); + } + + @VisibleForTesting + public static String validateAndGetFullLedgerAllocatorPoolPath(DistributedLogConfiguration conf, URI uri) throws IOException { + String poolPath = conf.getLedgerAllocatorPoolPath(); + LOG.info("PoolPath is {}", poolPath); + if (null == poolPath || !poolPath.startsWith(".") || poolPath.endsWith("/")) { + LOG.error("Invalid ledger allocator pool path specified when enabling ledger allocator pool : {}", poolPath); + throw new IOException("Invalid ledger allocator pool path specified : " + poolPath); + } + String poolName = conf.getLedgerAllocatorPoolName(); + if (null == poolName) { + LOG.error("No ledger allocator pool name specified when enabling ledger allocator pool."); + throw new IOException("No ledger allocator name specified when enabling ledger allocator pool."); + } + String rootPath = uri.getPath() + "/" + poolPath + "/" + poolName; + try { + PathUtils.validatePath(rootPath); + } catch (IllegalArgumentException iae) { + LOG.error("Invalid ledger allocator pool path specified when enabling ledger allocator pool : {}", poolPath); + throw new IOException("Invalid ledger allocator pool path specified : " + poolPath); + } + return rootPath; + } + + private void initializeOtherResources() throws IOException { + // Ledger allocator + if (conf.getEnableLedgerAllocatorPool()) { + String allocatorPoolPath = validateAndGetFullLedgerAllocatorPoolPath(conf, namespace); + allocator = LedgerAllocatorUtils.createLedgerAllocatorPool( + allocatorPoolPath, + conf.getLedgerAllocatorPoolCoreSize(), + conf, + writerZKC, + writerBKC, + scheduler); + if (null != allocator) { + allocator.start(); + } + LOG.info("Created ledger allocator pool under {} with size {}.", allocatorPoolPath, conf.getLedgerAllocatorPoolCoreSize()); + } else { + allocator = null; + } + + } + + private void checkState() throws IOException { + if (closed.get()) { + LOG.error("BK namespace driver {} is already closed", namespace); + throw new AlreadyClosedException("BK namespace driver " + namespace + " is already closed"); + } + } + + @Override + public void close() throws IOException { + if (!closed.compareAndSet(false, true)) { + return; + } + doClose(); + } + + private void doClose() { + if (null != accessControlManager) { + accessControlManager.close(); + LOG.info("Access Control Manager Stopped."); + } + + // Close the allocator + if (null != allocator) { + Utils.closeQuietly(allocator); + LOG.info("Ledger Allocator stopped."); + } + + // Shutdown log segment metadata stores + Utils.close(writerStreamMetadataStore); + Utils.close(readerStreamMetadataStore); + + writerBKC.close(); + readerBKC.close(); + writerZKC.close(); + readerZKC.close(); + // release bookkeeper resources + channelFactory.releaseExternalResources(); + LOG.info("Release external resources used by channel factory."); + requestTimer.stop(); + LOG.info("Stopped request timer"); + } + + @Override + public URI getUri() { + return namespace; + } + + @Override + public String getScheme() { + return DistributedLogConstants.BACKEND_BK; + } + + @Override + public LogMetadataStore getLogMetadataStore() { + return metadataStore; + } + + @Override + public LogStreamMetadataStore getLogStreamMetadataStore(Role role) { + if (Role.WRITER == role) { + return writerStreamMetadataStore; + } else { + return readerStreamMetadataStore; + } + } + + @Override + public LogSegmentEntryStore getLogSegmentEntryStore(Role role) { + if (Role.WRITER == role) { + return getWriterEntryStore(); + } else { + return getReaderEntryStore(); + } + } + + private LogSegmentEntryStore getWriterEntryStore() { + if (null == writerEntryStore) { + writerEntryStore = new BKLogSegmentEntryStore( + conf, + dynConf, + writerZKC, + writerBKC, + scheduler, + allocator, + statsLogger, + failureInjector); + } + return writerEntryStore; + } + + private LogSegmentEntryStore getReaderEntryStore() { + if (null == readerEntryStore) { + readerEntryStore = new BKLogSegmentEntryStore( + conf, + dynConf, + writerZKC, + readerBKC, + scheduler, + allocator, + statsLogger, + failureInjector); + } + return readerEntryStore; + } + + @Override + public AccessControlManager getAccessControlManager() throws IOException { + if (null == accessControlManager) { + String aclRootPath = getBkdlConfig().getACLRootPath(); + // Build the access control manager + if (aclRootPath == null) { + accessControlManager = DefaultAccessControlManager.INSTANCE; + LOG.info("Created default access control manager for {}", namespace); + } else { + if (!isReservedStreamName(aclRootPath)) { + throw new IOException("Invalid Access Control List Root Path : " + aclRootPath); + } + String zkRootPath = namespace.getPath() + "/" + aclRootPath; + LOG.info("Creating zk based access control manager @ {} for {}", + zkRootPath, namespace); + accessControlManager = new ZKAccessControlManager(conf, readerZKC, + zkRootPath, scheduler); + LOG.info("Created zk based access control manager @ {} for {}", + zkRootPath, namespace); + } + } + return accessControlManager; + } + + @Override + public SubscriptionsStore getSubscriptionsStore(String streamName) { + return new ZKSubscriptionsStore( + writerZKC, + LogMetadataForReader.getSubscribersPath(namespace, streamName, conf.getUnpartitionedStreamName())); + } + + // + // Legacy Intefaces + // + + @Override + public MetadataAccessor getMetadataAccessor(String streamName) + throws InvalidStreamNameException, IOException { + if (getBkdlConfig().isFederatedNamespace()) { + throw new UnsupportedOperationException(); + } + checkState(); + validateName(streamName); + return new ZKMetadataAccessor( + streamName, + conf, + namespace, + sharedWriterZKCBuilder, + sharedReaderZKCBuilder, + statsLogger); + } + + public Map<String, byte[]> enumerateLogsWithMetadataInNamespace() + throws IOException, IllegalArgumentException { + String namespaceRootPath = namespace.getPath(); + HashMap<String, byte[]> result = new HashMap<String, byte[]>(); + ZooKeeperClient zkc = writerZKC; + try { + ZooKeeper zk = Utils.sync(zkc, namespaceRootPath); + Stat currentStat = zk.exists(namespaceRootPath, false); + if (currentStat == null) { + return result; + } + List<String> children = zk.getChildren(namespaceRootPath, false); + for(String child: children) { + if (isReservedStreamName(child)) { + continue; + } + String zkPath = String.format("%s/%s", namespaceRootPath, child); + currentStat = zk.exists(zkPath, false); + if (currentStat == null) { + result.put(child, new byte[0]); + } else { + result.put(child, zk.getData(zkPath, false, currentStat)); + } + } + } catch (InterruptedException ie) { + LOG.error("Interrupted while deleting " + namespaceRootPath, ie); + throw new IOException("Interrupted while reading " + namespaceRootPath, ie); + } catch (KeeperException ke) { + LOG.error("Error reading" + namespaceRootPath + "entry in zookeeper", ke); + throw new IOException("Error reading" + namespaceRootPath + "entry in zookeeper", ke); + } + return result; + } + + // + // Zk & Bk Utils + // + + public static ZooKeeperClientBuilder createZKClientBuilder(String zkcName, + DistributedLogConfiguration conf, + String zkServers, + StatsLogger statsLogger) { + RetryPolicy retryPolicy = null; + if (conf.getZKNumRetries() > 0) { + retryPolicy = new BoundExponentialBackoffRetryPolicy( + conf.getZKRetryBackoffStartMillis(), + conf.getZKRetryBackoffMaxMillis(), conf.getZKNumRetries()); + } + ZooKeeperClientBuilder builder = ZooKeeperClientBuilder.newBuilder() + .name(zkcName) + .sessionTimeoutMs(conf.getZKSessionTimeoutMilliseconds()) + .retryThreadCount(conf.getZKClientNumberRetryThreads()) + .requestRateLimit(conf.getZKRequestRateLimit()) + .zkServers(zkServers) + .retryPolicy(retryPolicy) + .statsLogger(statsLogger) + .zkAclId(conf.getZkAclId()); + LOG.info("Created shared zooKeeper client builder {}: zkServers = {}, numRetries = {}, sessionTimeout = {}, retryBackoff = {}," + + " maxRetryBackoff = {}, zkAclId = {}.", new Object[] { zkcName, zkServers, conf.getZKNumRetries(), + conf.getZKSessionTimeoutMilliseconds(), conf.getZKRetryBackoffStartMillis(), + conf.getZKRetryBackoffMaxMillis(), conf.getZkAclId() }); + return builder; + } + + private BookKeeperClientBuilder createBKCBuilder(String bkcName, + DistributedLogConfiguration conf, + String zkServers, + String ledgersPath, + ClientSocketChannelFactory channelFactory, + HashedWheelTimer requestTimer, + Optional<FeatureProvider> featureProviderOptional, + StatsLogger statsLogger) { + BookKeeperClientBuilder builder = BookKeeperClientBuilder.newBuilder() + .name(bkcName) + .dlConfig(conf) + .zkServers(zkServers) + .ledgersPath(ledgersPath) + .channelFactory(channelFactory) + .requestTimer(requestTimer) + .featureProvider(featureProviderOptional) + .statsLogger(statsLogger); + LOG.info("Created shared client builder {} : zkServers = {}, ledgersPath = {}, numIOThreads = {}", + new Object[] { bkcName, zkServers, ledgersPath, conf.getBKClientNumberIOThreads() }); + return builder; + } + + // + // Test Methods + // + + @VisibleForTesting + public ZooKeeperClient getWriterZKC() { + return writerZKC; + } + + @VisibleForTesting + public BookKeeperClient getReaderBKC() { + return readerBKC; + } + + @VisibleForTesting + public AsyncFailureInjector getFailureInjector() { + return this.failureInjector; + } + + @VisibleForTesting + public LogStreamMetadataStore getWriterStreamMetadataStore() { + return writerStreamMetadataStore; + } + + @VisibleForTesting + public LedgerAllocator getLedgerAllocator() { + return allocator; + } +} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/cfc049cd/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/ZKLogMetadataStore.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/ZKLogMetadataStore.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/ZKLogMetadataStore.java index b84ab2e..50b1405 100644 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/ZKLogMetadataStore.java +++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/ZKLogMetadataStore.java @@ -36,7 +36,7 @@ import java.net.URI; import java.util.Iterator; import java.util.List; -import static com.twitter.distributedlog.impl.BKDLUtils.*; +import static com.twitter.distributedlog.util.DLUtils.*; /** * ZooKeeper based log metadata store