http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/main/java/org/apache/hedwig/server/handlers/UnsubscribeHandler.java ---------------------------------------------------------------------- diff --git a/hedwig-server/src/main/java/org/apache/hedwig/server/handlers/UnsubscribeHandler.java b/hedwig-server/src/main/java/org/apache/hedwig/server/handlers/UnsubscribeHandler.java deleted file mode 100644 index 3838e96..0000000 --- a/hedwig-server/src/main/java/org/apache/hedwig/server/handlers/UnsubscribeHandler.java +++ /dev/null @@ -1,105 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hedwig.server.handlers; - -import org.jboss.netty.channel.Channel; -import com.google.protobuf.ByteString; - -import org.apache.bookkeeper.util.MathUtils; -import org.apache.hedwig.client.data.TopicSubscriber; -import org.apache.hedwig.exceptions.PubSubException; -import org.apache.hedwig.protocol.PubSubProtocol.OperationType; -import org.apache.hedwig.protocol.PubSubProtocol.PubSubRequest; -import org.apache.hedwig.protocol.PubSubProtocol.UnsubscribeRequest; -import org.apache.hedwig.protoextensions.PubSubResponseUtils; -import org.apache.hedwig.server.common.ServerConfiguration; -import org.apache.hedwig.server.delivery.DeliveryManager; -import org.apache.hedwig.server.netty.ServerStats; -import org.apache.hedwig.server.netty.ServerStats.OpStats; -import org.apache.hedwig.server.netty.UmbrellaHandler; -import org.apache.hedwig.server.subscriptions.SubscriptionManager; -import org.apache.hedwig.server.topics.TopicManager; -import org.apache.hedwig.util.Callback; - -public class UnsubscribeHandler extends BaseHandler { - SubscriptionManager subMgr; - DeliveryManager deliveryMgr; - SubscriptionChannelManager subChannelMgr; - // op stats - final OpStats unsubStats; - - public UnsubscribeHandler(ServerConfiguration cfg, - TopicManager tm, - SubscriptionManager subMgr, - DeliveryManager deliveryMgr, - SubscriptionChannelManager subChannelMgr) { - super(tm, cfg); - this.subMgr = subMgr; - this.deliveryMgr = deliveryMgr; - this.subChannelMgr = subChannelMgr; - unsubStats = ServerStats.getInstance().getOpStats(OperationType.UNSUBSCRIBE); - } - - @Override - public void handleRequestAtOwner(final PubSubRequest request, final Channel channel) { - if (!request.hasUnsubscribeRequest()) { - UmbrellaHandler.sendErrorResponseToMalformedRequest(channel, request.getTxnId(), - "Missing unsubscribe request data"); - unsubStats.incrementFailedOps(); - return; - } - - final UnsubscribeRequest unsubRequest = request.getUnsubscribeRequest(); - final ByteString topic = request.getTopic(); - final ByteString subscriberId = unsubRequest.getSubscriberId(); - - final long requestTime = MathUtils.now(); - subMgr.unsubscribe(topic, subscriberId, new Callback<Void>() { - @Override - public void operationFailed(Object ctx, PubSubException exception) { - channel.write(PubSubResponseUtils.getResponseForException(exception, request.getTxnId())); - unsubStats.incrementFailedOps(); - } - - @Override - public void operationFinished(Object ctx, Void resultOfOperation) { - // we should not close the channel in delivery manager - // since client waits the response for closeSubscription request - // client side would close the channel - deliveryMgr.stopServingSubscriber(topic, subscriberId, null, - new Callback<Void>() { - @Override - public void operationFailed(Object ctx, PubSubException exception) { - channel.write(PubSubResponseUtils.getResponseForException(exception, request.getTxnId())); - unsubStats.incrementFailedOps(); - } - @Override - public void operationFinished(Object ctx, Void resultOfOperation) { - // remove the topic subscription from subscription channels - subChannelMgr.remove(new TopicSubscriber(topic, subscriberId), - channel); - channel.write(PubSubResponseUtils.getSuccessResponse(request.getTxnId())); - unsubStats.updateLatency(System.currentTimeMillis() - requestTime); - } - }, ctx); - } - }, null); - - } - -}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/main/java/org/apache/hedwig/server/jmx/HedwigJMXService.java ---------------------------------------------------------------------- diff --git a/hedwig-server/src/main/java/org/apache/hedwig/server/jmx/HedwigJMXService.java b/hedwig-server/src/main/java/org/apache/hedwig/server/jmx/HedwigJMXService.java deleted file mode 100644 index f0081d9..0000000 --- a/hedwig-server/src/main/java/org/apache/hedwig/server/jmx/HedwigJMXService.java +++ /dev/null @@ -1,37 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hedwig.server.jmx; - -/** - * An implementor of this interface is basiclly responsible for jmx beans. - */ -public interface HedwigJMXService { - /** - * register jmx - * - * @param parent - * Parent JMX Bean - */ - public void registerJMX(HedwigMBeanInfo parent); - - /** - * unregister jmx - */ - public void unregisterJMX(); -} http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/main/java/org/apache/hedwig/server/jmx/HedwigMBeanInfo.java ---------------------------------------------------------------------- diff --git a/hedwig-server/src/main/java/org/apache/hedwig/server/jmx/HedwigMBeanInfo.java b/hedwig-server/src/main/java/org/apache/hedwig/server/jmx/HedwigMBeanInfo.java deleted file mode 100644 index 866a217..0000000 --- a/hedwig-server/src/main/java/org/apache/hedwig/server/jmx/HedwigMBeanInfo.java +++ /dev/null @@ -1,27 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hedwig.server.jmx; - -import org.apache.zookeeper.jmx.ZKMBeanInfo; - -/** - * Hedwig MBean info interface. - */ -public interface HedwigMBeanInfo extends ZKMBeanInfo { -} http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/main/java/org/apache/hedwig/server/jmx/HedwigMBeanRegistry.java ---------------------------------------------------------------------- diff --git a/hedwig-server/src/main/java/org/apache/hedwig/server/jmx/HedwigMBeanRegistry.java b/hedwig-server/src/main/java/org/apache/hedwig/server/jmx/HedwigMBeanRegistry.java deleted file mode 100644 index 563cae8..0000000 --- a/hedwig-server/src/main/java/org/apache/hedwig/server/jmx/HedwigMBeanRegistry.java +++ /dev/null @@ -1,48 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hedwig.server.jmx; - -import javax.management.MalformedObjectNameException; -import javax.management.ObjectName; - -import org.apache.bookkeeper.jmx.BKMBeanRegistry; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * This class provides a unified interface for registering/unregistering of - * Hedwig MBeans with the platform MBean server. - */ -public class HedwigMBeanRegistry extends BKMBeanRegistry { - - static final String SERVICE = "org.apache.HedwigServer"; - - static HedwigMBeanRegistry instance = new HedwigMBeanRegistry(); - - public static HedwigMBeanRegistry getInstance(){ - return instance; - } - - @Override - protected String getDomainName() { - return SERVICE; - } - -} http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/main/java/org/apache/hedwig/server/meta/FactoryLayout.java ---------------------------------------------------------------------- diff --git a/hedwig-server/src/main/java/org/apache/hedwig/server/meta/FactoryLayout.java b/hedwig-server/src/main/java/org/apache/hedwig/server/meta/FactoryLayout.java deleted file mode 100644 index a4253f8..0000000 --- a/hedwig-server/src/main/java/org/apache/hedwig/server/meta/FactoryLayout.java +++ /dev/null @@ -1,167 +0,0 @@ -package org.apache.hedwig.server.meta; - -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -import java.io.BufferedReader; -import java.io.IOException; -import java.io.StringReader; - -import org.apache.zookeeper.CreateMode; -import org.apache.zookeeper.KeeperException; -import org.apache.zookeeper.ZooDefs.Ids; -import org.apache.zookeeper.ZooKeeper; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.protobuf.TextFormat; -import com.google.protobuf.InvalidProtocolBufferException; -import org.apache.hedwig.protocol.PubSubProtocol.ManagerMeta; -import org.apache.hedwig.server.common.ServerConfiguration; -import org.apache.hedwig.zookeeper.ZkUtils; -import static com.google.common.base.Charsets.UTF_8; - -/** - * This class encapsulates metadata manager layout information - * that is persistently stored in zookeeper. - * It provides parsing and serialization methods of such information. - * - */ -public class FactoryLayout { - private static final Logger logger = LoggerFactory.getLogger(FactoryLayout.class); - - // metadata manager name - public static final String NAME = "METADATA"; - // Znode name to store layout information - public static final String LAYOUT_ZNODE = "LAYOUT"; - public static final String LSEP = "\n"; - - private ManagerMeta managerMeta; - - /** - * Construct metadata manager factory layout. - * - * @param meta - * Meta describes what kind of factory used. - */ - public FactoryLayout(ManagerMeta meta) { - this.managerMeta = meta; - } - - public static String getFactoryLayoutPath(StringBuilder sb, ServerConfiguration cfg) { - return cfg.getZkManagersPrefix(sb).append("/").append(NAME) - .append("/").append(LAYOUT_ZNODE).toString(); - } - - public ManagerMeta getManagerMeta() { - return managerMeta; - } - - /** - * Store the factory layout into zookeeper - * - * @param zk - * ZooKeeper Handle - * @param cfg - * Server Configuration Object - * @throws KeeperException - * @throws IOException - * @throws InterruptedException - */ - public void store(ZooKeeper zk, ServerConfiguration cfg) - throws KeeperException, IOException, InterruptedException { - String factoryLayoutPath = getFactoryLayoutPath(new StringBuilder(), cfg); - - byte[] layoutData = TextFormat.printToString(managerMeta).getBytes(UTF_8); - ZkUtils.createFullPathOptimistic(zk, factoryLayoutPath, layoutData, - Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); - } - - @Override - public int hashCode() { - return managerMeta.hashCode(); - } - - @Override - public boolean equals(Object o) { - if (null == o || - !(o instanceof FactoryLayout)) { - return false; - } - FactoryLayout other = (FactoryLayout)o; - return managerMeta.equals(other.managerMeta); - } - - @Override - public String toString() { - return TextFormat.printToString(managerMeta); - } - - /** - * Read factory layout from zookeeper - * - * @param zk - * ZooKeeper Client - * @param cfg - * Server configuration object - * @return Factory layout, or null if none set in zookeeper - */ - public static FactoryLayout readLayout(final ZooKeeper zk, - final ServerConfiguration cfg) - throws IOException, KeeperException { - String factoryLayoutPath = getFactoryLayoutPath(new StringBuilder(), cfg); - byte[] layoutData; - try { - layoutData = zk.getData(factoryLayoutPath, false, null); - } catch (KeeperException.NoNodeException nne) { - return null; - } catch (InterruptedException ie) { - throw new IOException(ie); - } - ManagerMeta meta; - try { - BufferedReader reader = new BufferedReader( - new StringReader(new String(layoutData, UTF_8))); - ManagerMeta.Builder metaBuilder = ManagerMeta.newBuilder(); - TextFormat.merge(reader, metaBuilder); - meta = metaBuilder.build(); - } catch (InvalidProtocolBufferException ipbe) { - throw new IOException("Corrupted factory layout : ", ipbe); - } - - return new FactoryLayout(meta); - } - - /** - * Remove the factory layout from ZooKeeper. - * - * @param zk - * ZooKeeper instance - * @param cfg - * Server configuration object - * @throws KeeperException - * @throws InterruptedException - */ - public static void deleteLayout(ZooKeeper zk, ServerConfiguration cfg) - throws KeeperException, InterruptedException { - String factoryLayoutPath = getFactoryLayoutPath(new StringBuilder(), cfg); - zk.delete(factoryLayoutPath, -1); - } - -} http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/main/java/org/apache/hedwig/server/meta/MetadataManagerFactory.java ---------------------------------------------------------------------- diff --git a/hedwig-server/src/main/java/org/apache/hedwig/server/meta/MetadataManagerFactory.java b/hedwig-server/src/main/java/org/apache/hedwig/server/meta/MetadataManagerFactory.java deleted file mode 100644 index 129d03d..0000000 --- a/hedwig-server/src/main/java/org/apache/hedwig/server/meta/MetadataManagerFactory.java +++ /dev/null @@ -1,213 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hedwig.server.meta; - -import java.io.IOException; -import java.util.Iterator; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.bookkeeper.util.ReflectionUtils; -import org.apache.hedwig.protocol.PubSubProtocol.ManagerMeta; -import org.apache.hedwig.server.common.ServerConfiguration; -import org.apache.zookeeper.KeeperException; -import org.apache.zookeeper.ZooKeeper; - -import com.google.protobuf.ByteString; - -/** - * Metadata Manager used to manage metadata used by hedwig. - */ -public abstract class MetadataManagerFactory { - - private static final Logger LOG = LoggerFactory.getLogger(MetadataManagerFactory.class); - - /** - * Return current factory version. - * - * @return current version used by factory. - */ - public abstract int getCurrentVersion(); - - /** - * Initialize the metadata manager factory with given - * configuration and version. - * - * @param cfg - * Server configuration object - * @param zk - * ZooKeeper handler - * @param version - * Manager version - * @return metadata manager factory - * @throws IOException when fail to initialize the manager. - */ - protected abstract MetadataManagerFactory initialize( - ServerConfiguration cfg, ZooKeeper zk, int version) - throws IOException; - - /** - * Uninitialize the factory. - * - * @throws IOException when fail to shutdown the factory. - */ - public abstract void shutdown() throws IOException; - - /** - * Iterate over the topics list. - * Used by HedwigConsole to list available topics. - * - * @return iterator of the topics list. - * @throws IOException - */ - public abstract Iterator<ByteString> getTopics() throws IOException; - - /** - * Create topic persistence manager. - * - * @return topic persistence manager - */ - public abstract TopicPersistenceManager newTopicPersistenceManager(); - - /** - * Create subscription data manager. - * - * @return subscription data manager. - */ - public abstract SubscriptionDataManager newSubscriptionDataManager(); - - /** - * Create topic ownership manager. - * - * @return topic ownership manager. - */ - public abstract TopicOwnershipManager newTopicOwnershipManager(); - - /** - * Format the metadata for Hedwig. - * - * @param cfg - * Configuration instance - * @param zk - * ZooKeeper instance - */ - public abstract void format(ServerConfiguration cfg, ZooKeeper zk) throws IOException; - - /** - * Create new Metadata Manager Factory. - * - * @param conf - * Configuration Object. - * @param zk - * ZooKeeper Client Handle, talk to zk to know which manager factory is used. - * @return new manager factory. - * @throws IOException - */ - public static MetadataManagerFactory newMetadataManagerFactory( - final ServerConfiguration conf, final ZooKeeper zk) - throws IOException, KeeperException, InterruptedException { - Class<? extends MetadataManagerFactory> factoryClass; - try { - factoryClass = conf.getMetadataManagerFactoryClass(); - } catch (Exception e) { - throw new IOException("Failed to get metadata manager factory class from configuration : ", e); - } - // check that the configured manager is - // compatible with the existing layout - FactoryLayout layout = FactoryLayout.readLayout(zk, conf); - if (layout == null) { // no existing layout - return createMetadataManagerFactory(conf, zk, factoryClass); - } - LOG.debug("read meta layout {}", layout); - - if (factoryClass != null && - !layout.getManagerMeta().getManagerImpl().equals(factoryClass.getName())) { - throw new IOException("Configured metadata manager factory " + factoryClass.getName() - + " does not match existing factory " + layout.getManagerMeta().getManagerImpl()); - } - if (factoryClass == null) { - // no factory specified in configuration - String factoryClsName = layout.getManagerMeta().getManagerImpl(); - try { - Class<?> theCls = Class.forName(factoryClsName); - if (!MetadataManagerFactory.class.isAssignableFrom(theCls)) { - throw new IOException("Wrong metadata manager factory " + factoryClsName); - } - factoryClass = theCls.asSubclass(MetadataManagerFactory.class); - } catch (ClassNotFoundException cnfe) { - throw new IOException("No class found to instantiate metadata manager factory " + factoryClsName); - } - } - // instantiate the metadata manager factory - MetadataManagerFactory managerFactory; - try { - managerFactory = ReflectionUtils.newInstance(factoryClass); - } catch (Throwable t) { - throw new IOException("Failed to instantiate metadata manager factory : " + factoryClass, t); - } - return managerFactory.initialize(conf, zk, layout.getManagerMeta().getManagerVersion()); - } - - /** - * Create metadata manager factory and write factory layout to ZooKeeper. - * - * @param cfg - * Server Configuration object. - * @param zk - * ZooKeeper instance. - * @param factoryClass - * Metadata Manager Factory Class. - * @return metadata manager factory instance. - * @throws IOException - * @throws KeeperException - * @throws InterruptedException - */ - public static MetadataManagerFactory createMetadataManagerFactory( - ServerConfiguration cfg, ZooKeeper zk, - Class<? extends MetadataManagerFactory> factoryClass) - throws IOException, KeeperException, InterruptedException { - // use default manager if no one provided - if (factoryClass == null) { - factoryClass = ZkMetadataManagerFactory.class; - } - - MetadataManagerFactory managerFactory; - try { - managerFactory = ReflectionUtils.newInstance(factoryClass); - } catch (Throwable t) { - throw new IOException("Fail to instantiate metadata manager factory : " + factoryClass, t); - } - ManagerMeta managerMeta = ManagerMeta.newBuilder() - .setManagerImpl(factoryClass.getName()) - .setManagerVersion(managerFactory.getCurrentVersion()) - .build(); - FactoryLayout layout = new FactoryLayout(managerMeta); - try { - layout.store(zk, cfg); - } catch (KeeperException.NodeExistsException nee) { - FactoryLayout layout2 = FactoryLayout.readLayout(zk, cfg); - if (!layout2.equals(layout)) { - throw new IOException("Contention writing to layout to zookeeper, " - + " other layout " + layout2 + " is incompatible with our " - + "layout " + layout); - } - } - return managerFactory.initialize(cfg, zk, layout.getManagerMeta().getManagerVersion()); - } -} http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/main/java/org/apache/hedwig/server/meta/MsMetadataManagerFactory.java ---------------------------------------------------------------------- diff --git a/hedwig-server/src/main/java/org/apache/hedwig/server/meta/MsMetadataManagerFactory.java b/hedwig-server/src/main/java/org/apache/hedwig/server/meta/MsMetadataManagerFactory.java deleted file mode 100644 index b44ca91..0000000 --- a/hedwig-server/src/main/java/org/apache/hedwig/server/meta/MsMetadataManagerFactory.java +++ /dev/null @@ -1,867 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hedwig.server.meta; - -import java.io.IOException; -import java.io.UnsupportedEncodingException; -import java.util.Iterator; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; - -import static com.google.common.base.Charsets.UTF_8; -import com.google.protobuf.ByteString; -import com.google.protobuf.TextFormat; -import com.google.protobuf.TextFormat.ParseException; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.bookkeeper.metastore.MetaStore; -import org.apache.bookkeeper.metastore.MetastoreCallback; -import org.apache.bookkeeper.metastore.MetastoreCursor; -import org.apache.bookkeeper.metastore.MetastoreCursor.ReadEntriesCallback; -import org.apache.bookkeeper.metastore.MetastoreException; -import org.apache.bookkeeper.metastore.MetastoreFactory; -import org.apache.bookkeeper.metastore.MetastoreScannableTable; -import org.apache.bookkeeper.metastore.MetastoreScannableTable.Order; -import org.apache.bookkeeper.metastore.MetastoreTable; -import org.apache.bookkeeper.metastore.MetastoreUtils; - -import static org.apache.bookkeeper.metastore.MetastoreTable.*; -import org.apache.bookkeeper.metastore.MetastoreTableItem; -import org.apache.bookkeeper.metastore.MSException; -import org.apache.bookkeeper.metastore.Value; -import org.apache.bookkeeper.versioning.Version; -import org.apache.bookkeeper.versioning.Versioned; - -import org.apache.hedwig.exceptions.PubSubException; -import org.apache.hedwig.protocol.PubSubProtocol.LedgerRanges; -import org.apache.hedwig.protocol.PubSubProtocol.StatusCode; -import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionData; -import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionPreferences; -import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionState; -import org.apache.hedwig.protoextensions.SubscriptionStateUtils; -import org.apache.hedwig.server.common.ServerConfiguration; -import org.apache.hedwig.server.topics.HubInfo; -import org.apache.hedwig.util.Callback; - -import org.apache.zookeeper.ZooKeeper; - -/** - * MetadataManagerFactory for plug-in metadata storage. - */ -public class MsMetadataManagerFactory extends MetadataManagerFactory { - protected final static Logger logger = LoggerFactory.getLogger(MsMetadataManagerFactory.class); - - static final String UTF8 = "UTF-8"; - - static final int CUR_VERSION = 1; - - static final String OWNER_TABLE_NAME = "owner"; - static final String PERSIST_TABLE_NAME = "persist"; - static final String SUB_TABLE_NAME = "sub"; - - static class SyncResult<T> { - T value; - int rc; - boolean finished = false; - - public synchronized void complete(int rc, T value) { - this.rc = rc; - this.value = value; - finished = true; - - notify(); - } - - public synchronized void block() throws InterruptedException { - while (!finished) { - wait(); - } - } - - public int getReturnCode() { - return rc; - } - - public T getValue() { - return value; - } - } - - MetaStore metastore; - MetastoreTable ownerTable; - MetastoreTable persistTable; - MetastoreScannableTable subTable; - ServerConfiguration cfg; - - @Override - public MetadataManagerFactory initialize(ServerConfiguration cfg, ZooKeeper zk, int version) throws IOException { - if (CUR_VERSION != version) { - throw new IOException("Incompatible MsMetadataManagerFactory version " + version - + " found, expected version " + CUR_VERSION); - } - this.cfg = cfg; - try { - metastore = MetastoreFactory.createMetaStore(cfg.getMetastoreImplClass()); - // TODO: need to store metastore class and version in some place. - metastore.init(cfg.getConf(), metastore.getVersion()); - } catch (Exception e) { - throw new IOException("Load metastore failed : ", e); - } - - try { - ownerTable = metastore.createTable(OWNER_TABLE_NAME); - if (ownerTable == null) { - throw new IOException("create owner table failed"); - } - - persistTable = metastore.createTable(PERSIST_TABLE_NAME); - if (persistTable == null) { - throw new IOException("create persistence table failed"); - } - - subTable = metastore.createScannableTable(SUB_TABLE_NAME); - if (subTable == null) { - throw new IOException("create subscription table failed"); - } - } catch (MetastoreException me) { - throw new IOException("Failed to create tables : ", me); - } - - return this; - } - - @Override - public int getCurrentVersion() { - return CUR_VERSION; - } - - @Override - public void shutdown() { - if (metastore == null) { - return; - } - - if (ownerTable != null) { - ownerTable.close(); - ownerTable = null; - } - - if (persistTable != null) { - persistTable.close(); - persistTable = null; - } - - if (subTable != null) { - subTable.close(); - subTable = null; - } - - metastore.close(); - metastore = null; - } - - @Override - public Iterator<ByteString> getTopics() throws IOException { - SyncResult<MetastoreCursor> syn = new SyncResult<MetastoreCursor>(); - persistTable.openCursor(NON_FIELDS, new MetastoreCallback<MetastoreCursor>() { - public void complete(int rc, MetastoreCursor cursor, Object ctx) { - @SuppressWarnings("unchecked") - SyncResult<MetastoreCursor> syn = (SyncResult<MetastoreCursor>) ctx; - syn.complete(rc, cursor); - } - }, syn); - try { - syn.block(); - } catch (Exception e) { - throw new IOException("Interrupted on getting topics list : ", e); - } - - if (syn.getReturnCode() != MSException.Code.OK.getCode()) { - throw new IOException("Failed to get topics : ", MSException.create( - MSException.Code.get(syn.getReturnCode()), "")); - } - - final MetastoreCursor cursor = syn.getValue(); - return new Iterator<ByteString>() { - Iterator<MetastoreTableItem> itemIter = null; - - @Override - public boolean hasNext() { - while (null == itemIter || !itemIter.hasNext()) { - if (!cursor.hasMoreEntries()) { - return false; - } - - try { - itemIter = cursor.readEntries(cfg.getMetastoreMaxEntriesPerScan()); - } catch (MSException mse) { - logger.warn("Interrupted when iterating the topics list : ", mse); - return false; - } - } - return true; - } - - @Override - public ByteString next() { - MetastoreTableItem t = itemIter.next(); - return ByteString.copyFromUtf8(t.getKey()); - } - - @Override - public void remove() { - throw new UnsupportedOperationException("Doesn't support remove topic from topic iterator."); - } - }; - } - - @Override - public TopicOwnershipManager newTopicOwnershipManager() { - return new MsTopicOwnershipManagerImpl(ownerTable); - } - - static class MsTopicOwnershipManagerImpl implements TopicOwnershipManager { - - static final String OWNER_FIELD = "owner"; - - final MetastoreTable ownerTable; - - MsTopicOwnershipManagerImpl(MetastoreTable ownerTable) { - this.ownerTable = ownerTable; - } - - @Override - public void close() throws IOException { - // do nothing - } - - @Override - public void readOwnerInfo(final ByteString topic, final Callback<Versioned<HubInfo>> callback, Object ctx) { - ownerTable.get(topic.toStringUtf8(), new MetastoreCallback<Versioned<Value>>() { - @Override - public void complete(int rc, Versioned<Value> value, Object ctx) { - if (MSException.Code.NoKey.getCode() == rc) { - callback.operationFinished(ctx, null); - return; - } - - if (MSException.Code.OK.getCode() != rc) { - logErrorAndFinishOperation("Could not read ownership for topic " + topic.toStringUtf8(), - callback, ctx, rc); - return; - } - - HubInfo owner = null; - try { - byte[] data = value.getValue().getField(OWNER_FIELD); - if (data != null) { - owner = HubInfo.parse(new String(data, UTF_8)); - } - } catch (HubInfo.InvalidHubInfoException ihie) { - logger.warn("Failed to parse hub info for topic " + topic.toStringUtf8(), ihie); - } - Version version = value.getVersion(); - callback.operationFinished(ctx, new Versioned<HubInfo>(owner, version)); - } - }, ctx); - } - - @Override - public void writeOwnerInfo(final ByteString topic, final HubInfo owner, final Version version, - final Callback<Version> callback, Object ctx) { - Value value = new Value(); - value.setField(OWNER_FIELD, owner.toString().getBytes(UTF_8)); - - ownerTable.put(topic.toStringUtf8(), value, version, new MetastoreCallback<Version>() { - @Override - public void complete(int rc, Version ver, Object ctx) { - if (MSException.Code.OK.getCode() == rc) { - callback.operationFinished(ctx, ver); - return; - } else if (MSException.Code.NoKey.getCode() == rc) { - // no node - callback.operationFailed( - ctx, - PubSubException.create(StatusCode.NO_TOPIC_OWNER_INFO, "No owner info found for topic " - + topic.toStringUtf8())); - return; - } else if (MSException.Code.KeyExists.getCode() == rc) { - // key exists - callback.operationFailed( - ctx, - PubSubException.create(StatusCode.TOPIC_OWNER_INFO_EXISTS, "Owner info of topic " - + topic.toStringUtf8() + " existed.")); - return; - } else if (MSException.Code.BadVersion.getCode() == rc) { - // bad version - callback.operationFailed(ctx, PubSubException.create(StatusCode.BAD_VERSION, - "Bad version provided to update owner info of topic " + topic.toStringUtf8())); - return; - } else { - logErrorAndFinishOperation("Failed to update ownership of topic " + topic.toStringUtf8() - + " to " + owner, callback, ctx, rc); - return; - } - } - }, ctx); - } - - @Override - public void deleteOwnerInfo(final ByteString topic, Version version, final Callback<Void> callback, - Object ctx) { - ownerTable.remove(topic.toStringUtf8(), version, new MetastoreCallback<Void>() { - @Override - public void complete(int rc, Void value, Object ctx) { - if (MSException.Code.OK.getCode() == rc) { - logger.debug("Successfully deleted owner info for topic {}", topic.toStringUtf8()); - callback.operationFinished(ctx, null); - return; - } else if (MSException.Code.NoKey.getCode() == rc) { - // no node - callback.operationFailed( - ctx, - PubSubException.create(StatusCode.NO_TOPIC_OWNER_INFO, "No owner info found for topic " - + topic.toStringUtf8())); - return; - } else if (MSException.Code.BadVersion.getCode() == rc) { - // bad version - callback.operationFailed(ctx, PubSubException.create(StatusCode.BAD_VERSION, - "Bad version provided to delete owner info of topic " + topic.toStringUtf8())); - return; - } else { - logErrorAndFinishOperation("Failed to delete owner info for topic " + topic.toStringUtf8(), - callback, ctx, rc); - return; - } - } - }, ctx); - } - } - - @Override - public TopicPersistenceManager newTopicPersistenceManager() { - return new MsTopicPersistenceManagerImpl(persistTable); - } - - static class MsTopicPersistenceManagerImpl implements TopicPersistenceManager { - - static final String PERSIST_FIELD = "prst"; - - final MetastoreTable persistTable; - - MsTopicPersistenceManagerImpl(MetastoreTable persistTable) { - this.persistTable = persistTable; - } - - @Override - public void close() throws IOException { - // do nothing - } - - @Override - public void readTopicPersistenceInfo(final ByteString topic, final Callback<Versioned<LedgerRanges>> callback, - Object ctx) { - persistTable.get(topic.toStringUtf8(), new MetastoreCallback<Versioned<Value>>() { - @Override - public void complete(int rc, Versioned<Value> value, Object ctx) { - if (MSException.Code.OK.getCode() == rc) { - byte[] data = value.getValue().getField(PERSIST_FIELD); - if (data != null) { - parseAndReturnTopicLedgerRanges(topic, data, value.getVersion(), callback, ctx); - } else { // null data is same as NoKey - callback.operationFinished(ctx, null); - } - } else if (MSException.Code.NoKey.getCode() == rc) { - callback.operationFinished(ctx, null); - } else { - logErrorAndFinishOperation("Could not read ledgers node for topic " + topic.toStringUtf8(), - callback, ctx, rc); - } - } - }, ctx); - } - - /** - * Parse ledger ranges data and return it thru callback. - * - * @param topic - * Topic name - * @param data - * Topic Ledger Ranges data - * @param version - * Version of the topic ledger ranges data - * @param callback - * Callback to return ledger ranges - * @param ctx - * Context of the callback - */ - private void parseAndReturnTopicLedgerRanges(ByteString topic, byte[] data, Version version, - Callback<Versioned<LedgerRanges>> callback, Object ctx) { - try { - LedgerRanges.Builder rangesBuilder = LedgerRanges.newBuilder(); - TextFormat.merge(new String(data, UTF8), rangesBuilder); - LedgerRanges lr = rangesBuilder.build(); - Versioned<LedgerRanges> ranges = new Versioned<LedgerRanges>(lr, version); - callback.operationFinished(ctx, ranges); - } catch (ParseException e) { - StringBuilder sb = new StringBuilder(); - sb.append("Ledger ranges for topic ").append(topic.toStringUtf8()) - .append(" could not be deserialized."); - String msg = sb.toString(); - logger.error(msg, e); - callback.operationFailed(ctx, new PubSubException.UnexpectedConditionException(msg)); - } catch (UnsupportedEncodingException uee) { - StringBuilder sb = new StringBuilder(); - sb.append("Ledger ranges for topic ").append(topic.toStringUtf8()).append(" is not UTF-8 encoded."); - String msg = sb.toString(); - logger.error(msg, uee); - callback.operationFailed(ctx, new PubSubException.UnexpectedConditionException(msg)); - } - } - - @Override - public void writeTopicPersistenceInfo(final ByteString topic, LedgerRanges ranges, final Version version, - final Callback<Version> callback, Object ctx) { - Value value = new Value(); - value.setField(PERSIST_FIELD, TextFormat.printToString(ranges).getBytes(UTF_8)); - - persistTable.put(topic.toStringUtf8(), value, version, new MetastoreCallback<Version>() { - @Override - public void complete(int rc, Version ver, Object ctx) { - if (MSException.Code.OK.getCode() == rc) { - callback.operationFinished(ctx, ver); - return; - } else if (MSException.Code.NoKey.getCode() == rc) { - // no node - callback.operationFailed(ctx, PubSubException.create(StatusCode.NO_TOPIC_PERSISTENCE_INFO, - "No persistence info found for topic " + topic.toStringUtf8())); - return; - } else if (MSException.Code.KeyExists.getCode() == rc) { - // key exists - callback.operationFailed(ctx, PubSubException.create(StatusCode.TOPIC_PERSISTENCE_INFO_EXISTS, - "Persistence info of topic " + topic.toStringUtf8() + " existed.")); - return; - } else if (MSException.Code.BadVersion.getCode() == rc) { - // bad version - callback.operationFailed(ctx, PubSubException.create(StatusCode.BAD_VERSION, - "Bad version provided to update persistence info of topic " + topic.toStringUtf8())); - return; - } else { - logErrorAndFinishOperation("Could not write ledgers node for topic " + topic.toStringUtf8(), - callback, ctx, rc); - } - } - }, ctx); - } - - @Override - public void deleteTopicPersistenceInfo(final ByteString topic, final Version version, - final Callback<Void> callback, Object ctx) { - persistTable.remove(topic.toStringUtf8(), version, new MetastoreCallback<Void>() { - @Override - public void complete(int rc, Void value, Object ctx) { - if (MSException.Code.OK.getCode() == rc) { - logger.debug("Successfully deleted persistence info for topic {}.", topic.toStringUtf8()); - callback.operationFinished(ctx, null); - return; - } else if (MSException.Code.NoKey.getCode() == rc) { - // no node - callback.operationFailed(ctx, PubSubException.create(StatusCode.NO_TOPIC_PERSISTENCE_INFO, - "No persistence info found for topic " + topic.toStringUtf8())); - return; - } else if (MSException.Code.BadVersion.getCode() == rc) { - // bad version - callback.operationFailed(ctx, PubSubException.create(StatusCode.BAD_VERSION, - "Bad version provided to delete persistence info of topic " + topic.toStringUtf8())); - return; - } else { - logErrorAndFinishOperation("Failed to delete persistence info topic: " + topic.toStringUtf8() - + ", version: " + version, callback, ctx, rc, StatusCode.SERVICE_DOWN); - return; - } - } - }, ctx); - } - } - - @Override - public SubscriptionDataManager newSubscriptionDataManager() { - return new MsSubscriptionDataManagerImpl(cfg, subTable); - } - - static class MsSubscriptionDataManagerImpl implements SubscriptionDataManager { - - static final String SUB_STATE_FIELD = "sub_state"; - static final String SUB_PREFS_FIELD = "sub_preferences"; - - static final char TOPIC_SUB_FIRST_SEPARATOR = '\001'; - static final char TOPIC_SUB_LAST_SEPARATOR = '\002'; - - final ServerConfiguration cfg; - final MetastoreScannableTable subTable; - - MsSubscriptionDataManagerImpl(ServerConfiguration cfg, MetastoreScannableTable subTable) { - this.cfg = cfg; - this.subTable = subTable; - } - - @Override - public void close() throws IOException { - // do nothing - } - - private String getSubscriptionKey(ByteString topic, ByteString subscriberId) { - return new StringBuilder(topic.toStringUtf8()).append(TOPIC_SUB_FIRST_SEPARATOR) - .append(subscriberId.toStringUtf8()).toString(); - } - - private Value subscriptionData2Value(SubscriptionData subData) { - Value value = new Value(); - if (subData.hasState()) { - value.setField(SUB_STATE_FIELD, TextFormat.printToString(subData.getState()).getBytes(UTF_8)); - } - if (subData.hasPreferences()) { - value.setField(SUB_PREFS_FIELD, TextFormat.printToString(subData.getPreferences()).getBytes(UTF_8)); - } - return value; - } - - @Override - public void createSubscriptionData(final ByteString topic, final ByteString subscriberId, - final SubscriptionData subData, final Callback<Version> callback, Object ctx) { - String key = getSubscriptionKey(topic, subscriberId); - Value value = subscriptionData2Value(subData); - - subTable.put(key, value, Version.NEW, new MetastoreCallback<Version>() { - @Override - public void complete(int rc, Version ver, Object ctx) { - if (rc == MSException.Code.OK.getCode()) { - if (logger.isDebugEnabled()) { - logger.debug("Successfully create subscription for topic: " + topic.toStringUtf8() - + ", subscriberId: " + subscriberId.toStringUtf8() + ", data: " - + SubscriptionStateUtils.toString(subData)); - } - callback.operationFinished(ctx, ver); - } else if (rc == MSException.Code.KeyExists.getCode()) { - callback.operationFailed(ctx, PubSubException.create( - StatusCode.SUBSCRIPTION_STATE_EXISTS, - "Subscription data for (topic:" + topic.toStringUtf8() + ", subscriber:" - + subscriberId.toStringUtf8() + ") existed.")); - return; - } else { - logErrorAndFinishOperation("Failed to create topic: " + topic.toStringUtf8() - + ", subscriberId: " + subscriberId.toStringUtf8() + ", data: " - + SubscriptionStateUtils.toString(subData), callback, ctx, rc); - } - } - }, ctx); - } - - @Override - public boolean isPartialUpdateSupported() { - // TODO: Here we assume Metastore support partial update, but this - // maybe incorrect. - return true; - } - - @Override - public void replaceSubscriptionData(final ByteString topic, final ByteString subscriberId, - final SubscriptionData subData, final Version version, final Callback<Version> callback, - final Object ctx) { - updateSubscriptionData(topic, subscriberId, subData, version, callback, ctx); - } - - @Override - public void updateSubscriptionData(final ByteString topic, final ByteString subscriberId, - final SubscriptionData subData, final Version version, final Callback<Version> callback, - final Object ctx) { - String key = getSubscriptionKey(topic, subscriberId); - Value value = subscriptionData2Value(subData); - - subTable.put(key, value, version, new MetastoreCallback<Version>() { - @Override - public void complete(int rc, Version version, Object ctx) { - if (rc == MSException.Code.OK.getCode()) { - if (logger.isDebugEnabled()) { - logger.debug("Successfully updated subscription data for topic: " + topic.toStringUtf8() - + ", subscriberId: " + subscriberId.toStringUtf8() + ", data: " - + SubscriptionStateUtils.toString(subData) + ", version: " + version); - } - callback.operationFinished(ctx, version); - } else if (rc == MSException.Code.NoKey.getCode()) { - // no node - callback.operationFailed(ctx, PubSubException.create(StatusCode.NO_SUBSCRIPTION_STATE, - "No subscription data found for (topic:" + topic.toStringUtf8() + ", subscriber:" - + subscriberId.toStringUtf8() + ").")); - return; - } else if (rc == MSException.Code.BadVersion.getCode()) { - // bad version - callback.operationFailed(ctx, PubSubException.create(StatusCode.BAD_VERSION, - "Bad version provided to update subscription data of topic " + topic.toStringUtf8() - + " subscriberId " + subscriberId)); - return; - } else { - logErrorAndFinishOperation( - "Failed to update subscription data for topic: " + topic.toStringUtf8() - + ", subscriberId: " + subscriberId.toStringUtf8() + ", data: " - + SubscriptionStateUtils.toString(subData) + ", version: " + version, callback, - ctx, rc); - } - } - }, ctx); - } - - @Override - public void deleteSubscriptionData(final ByteString topic, final ByteString subscriberId, Version version, - final Callback<Void> callback, Object ctx) { - String key = getSubscriptionKey(topic, subscriberId); - subTable.remove(key, version, new MetastoreCallback<Void>() { - @Override - public void complete(int rc, Void value, Object ctx) { - if (rc == MSException.Code.OK.getCode()) { - logger.debug("Successfully delete subscription for topic: {}, subscriberId: {}.", - topic.toStringUtf8(), subscriberId.toStringUtf8()); - callback.operationFinished(ctx, null); - return; - } else if (rc == MSException.Code.BadVersion.getCode()) { - // bad version - callback.operationFailed(ctx, PubSubException.create(StatusCode.BAD_VERSION, - "Bad version provided to delete subscriptoin data of topic " + topic.toStringUtf8() - + " subscriberId " + subscriberId)); - return; - } else if (rc == MSException.Code.NoKey.getCode()) { - // no node - callback.operationFailed(ctx, PubSubException.create(StatusCode.NO_SUBSCRIPTION_STATE, - "No subscription data found for (topic:" + topic.toStringUtf8() + ", subscriber:" - + subscriberId.toStringUtf8() + ").")); - return; - } else { - logErrorAndFinishOperation("Failed to delete subscription topic: " + topic.toStringUtf8() - + ", subscriberId: " + subscriberId.toStringUtf8(), callback, ctx, rc, - StatusCode.SERVICE_DOWN); - } - } - }, ctx); - } - - private SubscriptionData value2SubscriptionData(Value value) throws ParseException, - UnsupportedEncodingException { - SubscriptionData.Builder builder = SubscriptionData.newBuilder(); - - byte[] stateData = value.getField(SUB_STATE_FIELD); - if (null != stateData) { - SubscriptionState.Builder stateBuilder = SubscriptionState.newBuilder(); - TextFormat.merge(new String(stateData, UTF8), stateBuilder); - SubscriptionState state = stateBuilder.build(); - builder.setState(state); - } - - byte[] prefsData = value.getField(SUB_PREFS_FIELD); - if (null != prefsData) { - SubscriptionPreferences.Builder preferencesBuilder = SubscriptionPreferences.newBuilder(); - TextFormat.merge(new String(prefsData, UTF8), preferencesBuilder); - SubscriptionPreferences preferences = preferencesBuilder.build(); - builder.setPreferences(preferences); - } - - return builder.build(); - } - - @Override - public void readSubscriptionData(final ByteString topic, final ByteString subscriberId, - final Callback<Versioned<SubscriptionData>> callback, Object ctx) { - String key = getSubscriptionKey(topic, subscriberId); - subTable.get(key, new MetastoreCallback<Versioned<Value>>() { - @Override - public void complete(int rc, Versioned<Value> value, Object ctx) { - if (rc == MSException.Code.NoKey.getCode()) { - callback.operationFinished(ctx, null); - return; - } - - if (rc != MSException.Code.OK.getCode()) { - logErrorAndFinishOperation( - "Could not read subscription data for topic: " + topic.toStringUtf8() - + ", subscriberId: " + subscriberId.toStringUtf8(), callback, ctx, rc); - return; - } - - try { - Versioned<SubscriptionData> subData = new Versioned<SubscriptionData>( - value2SubscriptionData(value.getValue()), value.getVersion()); - if (logger.isDebugEnabled()) { - logger.debug("Found subscription while acquiring topic: " + topic.toStringUtf8() - + ", subscriberId: " + subscriberId.toStringUtf8() + ", data: " - + SubscriptionStateUtils.toString(subData.getValue()) + ", version: " - + subData.getVersion()); - } - callback.operationFinished(ctx, subData); - } catch (ParseException e) { - StringBuilder sb = new StringBuilder(); - sb.append("Failed to deserialize subscription data for topic:").append(topic.toStringUtf8()) - .append(", subscriberId: ").append(subscriberId.toStringUtf8()); - String msg = sb.toString(); - logger.error(msg, e); - callback.operationFailed(ctx, new PubSubException.UnexpectedConditionException(msg)); - } catch (UnsupportedEncodingException uee) { - StringBuilder sb = new StringBuilder(); - sb.append("Subscription data for topic: ").append(topic.toStringUtf8()) - .append(", subscriberId: ").append(subscriberId.toStringUtf8()) - .append(" is not UFT-8 encoded"); - String msg = sb.toString(); - logger.error(msg, uee); - callback.operationFailed(ctx, new PubSubException.UnexpectedConditionException(msg)); - } - } - }, ctx); - } - - private String getSubscriptionPrefix(ByteString topic, char sep) { - return new StringBuilder(topic.toStringUtf8()).append(sep).toString(); - } - - private void readSubscriptions(final ByteString topic, final int keyLength, final MetastoreCursor cursor, - final Map<ByteString, Versioned<SubscriptionData>> topicSubs, - final Callback<Map<ByteString, Versioned<SubscriptionData>>> callback, Object ctx) { - if (!cursor.hasMoreEntries()) { - callback.operationFinished(ctx, topicSubs); - return; - } - ReadEntriesCallback readCb = new ReadEntriesCallback() { - @Override - public void complete(int rc, Iterator<MetastoreTableItem> items, Object ctx) { - if (rc != MSException.Code.OK.getCode()) { - logErrorAndFinishOperation("Could not read subscribers for cursor " + cursor, - callback, ctx, rc); - return; - } - while (items.hasNext()) { - MetastoreTableItem item = items.next(); - final ByteString subscriberId = ByteString.copyFromUtf8(item.getKey().substring(keyLength)); - try { - Versioned<Value> vv = item.getValue(); - Versioned<SubscriptionData> subData = new Versioned<SubscriptionData>( - value2SubscriptionData(vv.getValue()), vv.getVersion()); - topicSubs.put(subscriberId, subData); - } catch (ParseException e) { - StringBuilder sb = new StringBuilder(); - sb.append("Failed to deserialize subscription data for topic: ") - .append(topic.toStringUtf8()).append(", subscriberId: ") - .append(subscriberId.toStringUtf8()); - String msg = sb.toString(); - logger.error(msg, e); - callback.operationFailed(ctx, new PubSubException.UnexpectedConditionException(msg)); - return; - } catch (UnsupportedEncodingException e) { - StringBuilder sb = new StringBuilder(); - sb.append("Subscription data for topic: ").append(topic.toStringUtf8()) - .append(", subscriberId: ").append(subscriberId.toStringUtf8()) - .append(" is not UTF-8 encoded."); - String msg = sb.toString(); - logger.error(msg, e); - callback.operationFailed(ctx, new PubSubException.UnexpectedConditionException(msg)); - return; - } - } - readSubscriptions(topic, keyLength, cursor, topicSubs, callback, ctx); - } - }; - cursor.asyncReadEntries(cfg.getMetastoreMaxEntriesPerScan(), readCb, ctx); - } - - @Override - public void readSubscriptions(final ByteString topic, - final Callback<Map<ByteString, Versioned<SubscriptionData>>> callback, Object ctx) { - final String firstKey = getSubscriptionPrefix(topic, TOPIC_SUB_FIRST_SEPARATOR); - String lastKey = getSubscriptionPrefix(topic, TOPIC_SUB_LAST_SEPARATOR); - subTable.openCursor(firstKey, true, lastKey, true, Order.ASC, ALL_FIELDS, - new MetastoreCallback<MetastoreCursor>() { - @Override - public void complete(int rc, MetastoreCursor cursor, Object ctx) { - if (rc != MSException.Code.OK.getCode()) { - logErrorAndFinishOperation( - "Could not read subscribers for topic " + topic.toStringUtf8(), callback, ctx, - rc); - return; - } - - final Map<ByteString, Versioned<SubscriptionData>> topicSubs = - new ConcurrentHashMap<ByteString, Versioned<SubscriptionData>>(); - readSubscriptions(topic, firstKey.length(), cursor, topicSubs, callback, ctx); - } - }, ctx); - } - } - - /** - * callback finish operation with exception specify by code, regardless of - * the value of return code rc. - */ - private static <T> void logErrorAndFinishOperation(String msg, Callback<T> callback, Object ctx, int rc, - StatusCode code) { - logger.error(msg, MSException.create(MSException.Code.get(rc), "")); - callback.operationFailed(ctx, PubSubException.create(code, msg)); - } - - /** - * callback finish operation with corresponding PubSubException converted - * from return code rc. - */ - private static <T> void logErrorAndFinishOperation(String msg, Callback<T> callback, Object ctx, int rc) { - StatusCode code; - - if (rc == MSException.Code.NoKey.getCode()) { - code = StatusCode.NO_SUCH_TOPIC; - } else if (rc == MSException.Code.ServiceDown.getCode()) { - code = StatusCode.SERVICE_DOWN; - } else { - code = StatusCode.UNEXPECTED_CONDITION; - } - - logErrorAndFinishOperation(msg, callback, ctx, rc, code); - } - - @Override - public void format(ServerConfiguration cfg, ZooKeeper zk) throws IOException { - try { - int maxEntriesPerScan = cfg.getMetastoreMaxEntriesPerScan(); - - // clean topic ownership table. - logger.info("Cleaning topic ownership table ..."); - MetastoreUtils.cleanTable(ownerTable, maxEntriesPerScan); - logger.info("Cleaned topic ownership table successfully."); - - // clean topic subscription table. - logger.info("Cleaning topic subscription table ..."); - MetastoreUtils.cleanTable(subTable, maxEntriesPerScan); - logger.info("Cleaned topic subscription table successfully."); - - // clean topic persistence info table. - logger.info("Cleaning topic persistence info table ..."); - MetastoreUtils.cleanTable(persistTable, maxEntriesPerScan); - logger.info("Cleaned topic persistence info table successfully."); - } catch (MSException mse) { - throw new IOException("Exception when formatting hedwig metastore : ", mse); - } catch (InterruptedException ie) { - throw new IOException("Interrupted when formatting hedwig metastore : ", ie); - } - } - -} http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/main/java/org/apache/hedwig/server/meta/SubscriptionDataManager.java ---------------------------------------------------------------------- diff --git a/hedwig-server/src/main/java/org/apache/hedwig/server/meta/SubscriptionDataManager.java b/hedwig-server/src/main/java/org/apache/hedwig/server/meta/SubscriptionDataManager.java deleted file mode 100644 index 0bebd45..0000000 --- a/hedwig-server/src/main/java/org/apache/hedwig/server/meta/SubscriptionDataManager.java +++ /dev/null @@ -1,158 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hedwig.server.meta; - -import java.io.Closeable; -import java.util.Map; - -import com.google.protobuf.ByteString; - -import org.apache.bookkeeper.versioning.Version; -import org.apache.bookkeeper.versioning.Versioned; -import org.apache.hedwig.exceptions.PubSubException; -import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionData; -import org.apache.hedwig.util.Callback; - -/** - * Manage subscription data. - */ -public interface SubscriptionDataManager extends Closeable { - - /** - * Create subscription data. - * - * @param topic - * Topic name - * @param subscriberId - * Subscriber id - * @param data - * Subscription data - * @param callback - * Callback when subscription state created. New version would be returned. - * {@link PubSubException.SubscriptionStateExistsException} is returned when subscription state - * existed before. - * @param ctx - * Context of the callback - */ - public void createSubscriptionData(ByteString topic, ByteString subscriberId, SubscriptionData data, - Callback<Version> callback, Object ctx); - - /** - * Whether the metadata manager supports partial update. - * - * @return true if the metadata manager supports partial update. - * otherwise, return false. - */ - public boolean isPartialUpdateSupported(); - - /** - * Update subscription data. - * - * @param topic - * Topic name - * @param subscriberId - * Subscriber id - * @param dataToUpdate - * Subscription data to update. So it is a partial data, which contains - * the part of data to update. The implementation should not replace - * existing subscription data with <i>dataToUpdate</i> directly. - * E.g. if there is only state in it, you should update state only. - * @param version - * Current version of subscription data. - * @param callback - * Callback when subscription state updated. New version would be returned. - * {@link PubSubException.BadVersionException} is returned when version doesn't match, - * {@link PubSubException.NoSubscriptionStateException} is returned when no subscription state - * is found. - * @param ctx - * Context of the callback - */ - public void updateSubscriptionData(ByteString topic, ByteString subscriberId, SubscriptionData dataToUpdate, - Version version, Callback<Version> callback, Object ctx); - - /** - * Replace subscription data. - * - * @param topic - * Topic name - * @param subscriberId - * Subscriber id - * @param dataToReplace - * Subscription data to replace. - * @param version - * Current version of subscription data. - * @param callback - * Callback when subscription state updated. New version would be returned. - * {@link PubSubException.BadVersionException} is returned when version doesn't match, - * {@link PubSubException.NoSubscriptionStateException} is returned when no subscription state - * is found. - * @param ctx - * Context of the callback - */ - public void replaceSubscriptionData(ByteString topic, ByteString subscriberId, SubscriptionData dataToReplace, - Version version, Callback<Version> callback, Object ctx); - - /** - * Remove subscription data. - * - * @param topic - * Topic name - * @param subscriberId - * Subscriber id - * @param version - * Current version of subscription data. - * @param callback - * Callback when subscription state deleted - * {@link PubSubException.BadVersionException} is returned when version doesn't match, - * {@link PubSubException.NoSubscriptionStateException} is returned when no subscription state - * is found. - * @param ctx - * Context of the callback - */ - public void deleteSubscriptionData(ByteString topic, ByteString subscriberId, Version version, - Callback<Void> callback, Object ctx); - - /** - * Read subscription data with version. - * - * @param topic - * Topic Name - * @param subscriberId - * Subscriber id - * @param callback - * Callback when subscription data read. - * Null is returned when no subscription data is found. - * @param ctx - * Context of the callback - */ - public void readSubscriptionData(ByteString topic, ByteString subscriberId, - Callback<Versioned<SubscriptionData>> callback, Object ctx); - - /** - * Read all subscriptions of a topic. - * - * @param topic - * Topic name - * @param callback - * Callback to return subscriptions with version information - * @param ctx - * Contxt of the callback - */ - public void readSubscriptions(ByteString topic, Callback<Map<ByteString, Versioned<SubscriptionData>>> cb, - Object ctx); -} http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/main/java/org/apache/hedwig/server/meta/TopicOwnershipManager.java ---------------------------------------------------------------------- diff --git a/hedwig-server/src/main/java/org/apache/hedwig/server/meta/TopicOwnershipManager.java b/hedwig-server/src/main/java/org/apache/hedwig/server/meta/TopicOwnershipManager.java deleted file mode 100644 index f17011c..0000000 --- a/hedwig-server/src/main/java/org/apache/hedwig/server/meta/TopicOwnershipManager.java +++ /dev/null @@ -1,93 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hedwig.server.meta; - -import java.io.Closeable; - -import com.google.protobuf.ByteString; - -import org.apache.bookkeeper.versioning.Version; -import org.apache.bookkeeper.versioning.Versioned; -import org.apache.hedwig.server.topics.HubInfo; -import org.apache.hedwig.util.Callback; - -/** - * Manage topic ownership - */ -public interface TopicOwnershipManager extends Closeable { - - /** - * Read owner information of a topic. - * - * @param topic - * Topic Name - * @param callback - * Callback to return hub info. If there is no owner info, return null; - * If there is data but not valid owner info, return a Versioned object with null hub info; - * If there is valid owner info, return versioned hub info. - * @param ctx - * Context of the callback - */ - public void readOwnerInfo(ByteString topic, Callback<Versioned<HubInfo>> callback, Object ctx); - - /** - * Write owner info for a specified topic. - * A new owner info would be created if there is no one existed before. - * - * @param topic - * Topic Name - * @param owner - * Owner hub info - * @param version - * Current version of owner info - * If <code>version</code> is {@link Version.NEW}, create owner info. - * {@link PubSubException.TopicOwnerInfoExistsException} is returned when - * owner info existed before. - * Otherwise, the owner info is updated only when - * provided version equals to its current version. - * {@link PubSubException.BadVersionException} is returned when version doesn't match, - * {@link PubSubException.NoTopicOwnerInfoException} is returned when no owner info - * found to update. - * @param callback - * Callback when owner info updated. New version would be returned if succeed to write. - * @param ctx - * Context of the callback - */ - public void writeOwnerInfo(ByteString topic, HubInfo owner, Version version, - Callback<Version> callback, Object ctx); - - /** - * Delete owner info for a specified topic. - * - * @param topic - * Topic Name - * @param version - * Current version of owner info - * If <code>version</code> is {@link Version.ANY}, delete owner info no matter its current version. - * Otherwise, the owner info is deleted only when - * provided version equals to its current version. - * @param callback - * Callback when owner info deleted. - * {@link PubSubException.NoTopicOwnerInfoException} is returned when no owner info. - * {@link PubSubException.BadVersionException} is returned when version doesn't match. - * @param ctx - * Context of the callback. - */ - public void deleteOwnerInfo(ByteString topic, Version version, - Callback<Void> callback, Object ctx); -} http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/main/java/org/apache/hedwig/server/meta/TopicPersistenceManager.java ---------------------------------------------------------------------- diff --git a/hedwig-server/src/main/java/org/apache/hedwig/server/meta/TopicPersistenceManager.java b/hedwig-server/src/main/java/org/apache/hedwig/server/meta/TopicPersistenceManager.java deleted file mode 100644 index 69ee709..0000000 --- a/hedwig-server/src/main/java/org/apache/hedwig/server/meta/TopicPersistenceManager.java +++ /dev/null @@ -1,96 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hedwig.server.meta; - -import java.io.Closeable; - -import com.google.protobuf.ByteString; - -import org.apache.bookkeeper.versioning.Version; -import org.apache.bookkeeper.versioning.Versioned; -import org.apache.hedwig.exceptions.PubSubException; -import org.apache.hedwig.protocol.PubSubProtocol.LedgerRanges; -import org.apache.hedwig.util.Callback; - -/** - * Manage topic persistence metadata. - */ -public interface TopicPersistenceManager extends Closeable { - - /** - * Read persistence info of a specified topic. - * - * @param topic - * Topic Name - * @param callback - * Callback when read persistence info. - * If no persistence info found, return null. - * @param ctx - * Context of the callback - */ - public void readTopicPersistenceInfo(ByteString topic, - Callback<Versioned<LedgerRanges>> callback, Object ctx); - - /** - * Update persistence info of a specified topic. - * - * @param topic - * Topic name - * @param ranges - * Persistence info - * @param version - * Current version of persistence info. - * If <code>version</code> is {@link Version.NEW}, create persistence info; - * {@link PubSubException.TopicPersistenceInfoExistsException} is returned when - * persistence info existed before. - * Otherwise, the persitence info is updated only when - * provided version equals to its current version. - * {@link PubSubException.BadVersionException} is returned when version doesn't match, - * {@link PubSubException.NoTopicPersistenceInfoException} is returned when no - * persistence info found to update. - * @param callback - * Callback when persistence info updated. New version would be returned. - * @param ctx - * Context of the callback - */ - public void writeTopicPersistenceInfo(ByteString topic, LedgerRanges ranges, Version version, - Callback<Version> callback, Object ctx); - - /** - * Delete persistence info of a specified topic. - * Currently used in test cases. - * - * @param topic - * Topic name - * @param version - * Current version of persistence info - * If <code>version</code> is {@link Version.ANY}, delete persistence info no matter its current version. - * Otherwise, the persitence info is deleted only when - * provided version equals to its current version. - * @param callback - * Callback return whether the deletion succeed. - * {@link PubSubException.NoTopicPersistenceInfoException} is returned when no persistence. - * {@link PubSubException.BadVersionException} is returned when version doesn't match. - * info found to delete. - * @param ctx - * Context of the callback - */ - public void deleteTopicPersistenceInfo(ByteString topic, Version version, - Callback<Void> callback, Object ctx); - -}