http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-protocol/src/main/java/org/apache/hedwig/protoextensions/MapUtils.java ---------------------------------------------------------------------- diff --git a/hedwig-protocol/src/main/java/org/apache/hedwig/protoextensions/MapUtils.java b/hedwig-protocol/src/main/java/org/apache/hedwig/protoextensions/MapUtils.java deleted file mode 100644 index 898e8b1..0000000 --- a/hedwig-protocol/src/main/java/org/apache/hedwig/protoextensions/MapUtils.java +++ /dev/null @@ -1,74 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hedwig.protoextensions; - -import java.util.HashMap; -import java.util.Map; - -import com.google.protobuf.ByteString; -import org.apache.hedwig.protocol.PubSubProtocol; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class MapUtils { - - static final Logger logger = LoggerFactory.getLogger(MapUtils.class); - - public static String toString(PubSubProtocol.Map map) { - StringBuilder sb = new StringBuilder(); - int numEntries = map.getEntriesCount(); - for (int i=0; i<numEntries; i++) { - PubSubProtocol.Map.Entry entry = map.getEntries(i); - String key = entry.getKey(); - ByteString value = entry.getValue(); - sb.append(key).append('=').append(value.toStringUtf8()); - if (i != (numEntries - 1)) { - sb.append(','); - } - } - return sb.toString(); - } - - public static Map<String, ByteString> buildMap(PubSubProtocol.Map protoMap) { - Map<String, ByteString> javaMap = new HashMap<String, ByteString>(); - - int numEntries = protoMap.getEntriesCount(); - for (int i=0; i<numEntries; i++) { - PubSubProtocol.Map.Entry entry = protoMap.getEntries(i); - String key = entry.getKey(); - if (javaMap.containsKey(key)) { - ByteString preValue = javaMap.get(key); - logger.warn("Key " + key + " has already been defined as value : " + preValue.toStringUtf8()); - } else { - javaMap.put(key, entry.getValue()); - } - } - return javaMap; - } - - public static PubSubProtocol.Map.Builder buildMapBuilder(Map<String, ByteString> javaMap) { - PubSubProtocol.Map.Builder mapBuilder = PubSubProtocol.Map.newBuilder(); - - for (Map.Entry<String, ByteString> entry : javaMap.entrySet()) { - mapBuilder.addEntries(PubSubProtocol.Map.Entry.newBuilder().setKey(entry.getKey()) - .setValue(entry.getValue())); - } - return mapBuilder; - } -}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-protocol/src/main/java/org/apache/hedwig/protoextensions/MessageIdUtils.java ---------------------------------------------------------------------- diff --git a/hedwig-protocol/src/main/java/org/apache/hedwig/protoextensions/MessageIdUtils.java b/hedwig-protocol/src/main/java/org/apache/hedwig/protoextensions/MessageIdUtils.java deleted file mode 100644 index 9ceec26..0000000 --- a/hedwig-protocol/src/main/java/org/apache/hedwig/protoextensions/MessageIdUtils.java +++ /dev/null @@ -1,153 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hedwig.protoextensions; - -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import com.google.protobuf.ByteString; -import org.apache.hedwig.protocol.PubSubProtocol.Message; -import org.apache.hedwig.exceptions.PubSubException.UnexpectedConditionException; -import org.apache.hedwig.protocol.PubSubProtocol.MessageSeqId; -import org.apache.hedwig.protocol.PubSubProtocol.RegionSpecificSeqId; - -public class MessageIdUtils { - - public static String msgIdToReadableString(MessageSeqId seqId) { - StringBuilder sb = new StringBuilder(); - sb.append("local:"); - sb.append(seqId.getLocalComponent()); - - String separator = ";"; - for (RegionSpecificSeqId regionId : seqId.getRemoteComponentsList()) { - sb.append(separator); - sb.append(regionId.getRegion().toStringUtf8()); - sb.append(':'); - sb.append(regionId.getSeqId()); - } - return sb.toString(); - } - - public static Map<ByteString, RegionSpecificSeqId> inMapForm(MessageSeqId msi) { - Map<ByteString, RegionSpecificSeqId> map = new HashMap<ByteString, RegionSpecificSeqId>(); - - for (RegionSpecificSeqId lmsid : msi.getRemoteComponentsList()) { - map.put(lmsid.getRegion(), lmsid); - } - - return map; - } - - public static boolean areEqual(MessageSeqId m1, MessageSeqId m2) { - - if (m1.getLocalComponent() != m2.getLocalComponent()) { - return false; - } - - if (m1.getRemoteComponentsCount() != m2.getRemoteComponentsCount()) { - return false; - } - - Map<ByteString, RegionSpecificSeqId> m2map = inMapForm(m2); - - for (RegionSpecificSeqId lmsid1 : m1.getRemoteComponentsList()) { - RegionSpecificSeqId lmsid2 = m2map.get(lmsid1.getRegion()); - if (lmsid2 == null) { - return false; - } - if (lmsid1.getSeqId() != lmsid2.getSeqId()) { - return false; - } - } - - return true; - - } - - public static Message mergeLocalSeqId(Message.Builder messageBuilder, long localSeqId) { - MessageSeqId.Builder msidBuilder = MessageSeqId.newBuilder(messageBuilder.getMsgId()); - msidBuilder.setLocalComponent(localSeqId); - messageBuilder.setMsgId(msidBuilder); - return messageBuilder.build(); - } - - public static Message mergeLocalSeqId(Message orginalMessage, long localSeqId) { - return mergeLocalSeqId(Message.newBuilder(orginalMessage), localSeqId); - } - - /** - * Compares two seq numbers represented as lists of longs. - * - * @param l1 - * @param l2 - * @return 1 if the l1 is greater, 0 if they are equal, -1 if l2 is greater - * @throws UnexpectedConditionException - * If the lists are of unequal length - */ - public static int compare(List<Long> l1, List<Long> l2) throws UnexpectedConditionException { - if (l1.size() != l2.size()) { - throw new UnexpectedConditionException("Seq-ids being compared have different sizes: " + l1.size() - + " and " + l2.size()); - } - - for (int i = 0; i < l1.size(); i++) { - long v1 = l1.get(i); - long v2 = l2.get(i); - - if (v1 == v2) { - continue; - } - - return v1 > v2 ? 1 : -1; - } - - // All components equal - return 0; - } - - /** - * Returns the element-wise vector maximum of the two vectors id1 and id2, - * if we imagine them to be sparse representations of vectors. - */ - public static void takeRegionMaximum(MessageSeqId.Builder newIdBuilder, MessageSeqId id1, MessageSeqId id2) { - Map<ByteString, RegionSpecificSeqId> id2Map = MessageIdUtils.inMapForm(id2); - - for (RegionSpecificSeqId rrsid1 : id1.getRemoteComponentsList()) { - ByteString region = rrsid1.getRegion(); - - RegionSpecificSeqId rssid2 = id2Map.get(region); - - if (rssid2 == null) { - newIdBuilder.addRemoteComponents(rrsid1); - continue; - } - - newIdBuilder.addRemoteComponents((rrsid1.getSeqId() > rssid2.getSeqId()) ? rrsid1 : rssid2); - - // remove from map - id2Map.remove(region); - } - - // now take the remaining components in the map and add them - for (RegionSpecificSeqId rssid2 : id2Map.values()) { - newIdBuilder.addRemoteComponents(rssid2); - } - - } -} http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-protocol/src/main/java/org/apache/hedwig/protoextensions/PubSubResponseUtils.java ---------------------------------------------------------------------- diff --git a/hedwig-protocol/src/main/java/org/apache/hedwig/protoextensions/PubSubResponseUtils.java b/hedwig-protocol/src/main/java/org/apache/hedwig/protoextensions/PubSubResponseUtils.java deleted file mode 100644 index 5a9cdf7..0000000 --- a/hedwig-protocol/src/main/java/org/apache/hedwig/protoextensions/PubSubResponseUtils.java +++ /dev/null @@ -1,68 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hedwig.protoextensions; - -import com.google.protobuf.ByteString; - -import org.apache.hedwig.exceptions.PubSubException; -import org.apache.hedwig.protocol.PubSubProtocol.ProtocolVersion; -import org.apache.hedwig.protocol.PubSubProtocol.PubSubResponse; -import org.apache.hedwig.protocol.PubSubProtocol.ResponseBody; -import org.apache.hedwig.protocol.PubSubProtocol.StatusCode; -import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionEvent; -import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionEventResponse; - -public class PubSubResponseUtils { - - /** - * Change here if bumping up the version number that the server sends back - */ - public final static ProtocolVersion serverVersion = ProtocolVersion.VERSION_ONE; - - static PubSubResponse.Builder getBasicBuilder(StatusCode status) { - return PubSubResponse.newBuilder().setProtocolVersion(serverVersion).setStatusCode(status); - } - - public static PubSubResponse getSuccessResponse(long txnId) { - return getBasicBuilder(StatusCode.SUCCESS).setTxnId(txnId).build(); - } - - public static PubSubResponse getSuccessResponse(long txnId, ResponseBody respBody) { - return getBasicBuilder(StatusCode.SUCCESS).setTxnId(txnId) - .setResponseBody(respBody).build(); - } - - public static PubSubResponse getResponseForException(PubSubException e, long txnId) { - return getBasicBuilder(e.getCode()).setStatusMsg(e.getMessage()).setTxnId(txnId).build(); - } - - public static PubSubResponse getResponseForSubscriptionEvent(ByteString topic, - ByteString subscriberId, - SubscriptionEvent event) { - SubscriptionEventResponse.Builder eventBuilder = - SubscriptionEventResponse.newBuilder().setEvent(event); - ResponseBody.Builder respBuilder = - ResponseBody.newBuilder().setSubscriptionEvent(eventBuilder); - PubSubResponse response = PubSubResponse.newBuilder() - .setProtocolVersion(ProtocolVersion.VERSION_ONE) - .setStatusCode(StatusCode.SUCCESS).setTxnId(0) - .setTopic(topic).setSubscriberId(subscriberId) - .setResponseBody(respBuilder).build(); - return response; - } -} http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-protocol/src/main/java/org/apache/hedwig/protoextensions/SubscriptionStateUtils.java ---------------------------------------------------------------------- diff --git a/hedwig-protocol/src/main/java/org/apache/hedwig/protoextensions/SubscriptionStateUtils.java b/hedwig-protocol/src/main/java/org/apache/hedwig/protoextensions/SubscriptionStateUtils.java deleted file mode 100644 index e195ace..0000000 --- a/hedwig-protocol/src/main/java/org/apache/hedwig/protoextensions/SubscriptionStateUtils.java +++ /dev/null @@ -1,100 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hedwig.protoextensions; - -import java.util.HashMap; -import java.util.Map; - -import com.google.protobuf.ByteString; -import com.google.protobuf.InvalidProtocolBufferException; -import org.apache.hedwig.protocol.PubSubProtocol; -import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionData; -import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionPreferences; -import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionState; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class SubscriptionStateUtils { - - static final Logger logger = LoggerFactory.getLogger(SubscriptionStateUtils.class); - - // For now, to differentiate hub subscribers from local ones, the - // subscriberId will be prepended with a hard-coded prefix. Local - // subscribers will validate that the subscriberId used cannot start with - // this prefix. This is only used internally by the hub subscribers. - public static final String HUB_SUBSCRIBER_PREFIX = "__"; - - public static SubscriptionData parseSubscriptionData(byte[] data) - throws InvalidProtocolBufferException { - try { - return SubscriptionData.parseFrom(data); - } catch (InvalidProtocolBufferException ex) { - logger.info("Failed to parse data as SubscriptionData. Fall backward to parse it as SubscriptionState for backward compatability."); - // backward compability - SubscriptionState state = SubscriptionState.parseFrom(data); - return SubscriptionData.newBuilder().setState(state).build(); - } - } - - public static String toString(SubscriptionData data) { - StringBuilder sb = new StringBuilder(); - if (data.hasState()) { - sb.append("State : { ").append(toString(data.getState())).append(" };"); - } - if (data.hasPreferences()) { - sb.append("Preferences : { ").append(toString(data.getPreferences())).append(" };"); - } - return sb.toString(); - } - - public static String toString(SubscriptionState state) { - StringBuilder sb = new StringBuilder(); - sb.append("consumeSeqId: " + MessageIdUtils.msgIdToReadableString(state.getMsgId())); - return sb.toString(); - } - - public static String toString(SubscriptionPreferences preferences) { - StringBuilder sb = new StringBuilder(); - sb.append("System Preferences : ["); - if (preferences.hasMessageBound()) { - sb.append("(messageBound=").append(preferences.getMessageBound()) - .append(")"); - } - sb.append("]"); - if (preferences.hasOptions()) { - sb.append(", Customized Preferences : ["); - sb.append(MapUtils.toString(preferences.getOptions())); - sb.append("]"); - } - return sb.toString(); - } - - public static boolean isHubSubscriber(ByteString subscriberId) { - return subscriberId.toStringUtf8().startsWith(HUB_SUBSCRIBER_PREFIX); - } - - public static Map<String, ByteString> buildUserOptions(SubscriptionPreferences preferences) { - if (preferences.hasOptions()) { - return MapUtils.buildMap(preferences.getOptions()); - } else { - return new HashMap<String, ByteString>(); - } - } - -} http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-protocol/src/main/protobuf/PubSubProtocol.proto ---------------------------------------------------------------------- diff --git a/hedwig-protocol/src/main/protobuf/PubSubProtocol.proto b/hedwig-protocol/src/main/protobuf/PubSubProtocol.proto deleted file mode 100644 index c31f0a6..0000000 --- a/hedwig-protocol/src/main/protobuf/PubSubProtocol.proto +++ /dev/null @@ -1,313 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -option java_package = "org.apache.hedwig.protocol"; -option optimize_for = SPEED; -package Hedwig; - -enum ProtocolVersion{ - VERSION_ONE = 1; -} - -// common structure to store header or properties -message Map { - message Entry { - optional string key = 1; - optional bytes value = 2; - } - repeated Entry entries = 1; -} - -// message header -message MessageHeader { - // user customized fields used for message filter - optional Map properties = 1; - // following are system properties in message header - optional string messageType = 2; -} - -/* - * this is the structure that will be serialized - */ -message Message { - required bytes body = 1; - optional bytes srcRegion = 2; - optional MessageSeqId msgId = 3; - // message header - optional MessageHeader header = 4; -} - -message RegionSpecificSeqId { - required bytes region = 1; - required uint64 seqId = 2; -} - -message MessageSeqId{ - optional uint64 localComponent = 1; - repeated RegionSpecificSeqId remoteComponents = 2; -} - -enum OperationType{ - PUBLISH = 0; - SUBSCRIBE = 1; - CONSUME = 2; - UNSUBSCRIBE = 3; - - //the following two are only used for the hedwig proxy - START_DELIVERY = 4; - STOP_DELIVERY = 5; - // end for requests only used for hedwig proxy - - CLOSESUBSCRIPTION = 6; -} - -/* A PubSubRequest is just a union of the various request types, with - * an enum telling us which type it is. The same can also be done through - * extensions. We need one request type that we will deserialize into on - * the server side. - */ -message PubSubRequest{ - - required ProtocolVersion protocolVersion = 1; - required OperationType type = 2; - repeated bytes triedServers = 3; - required uint64 txnId = 4; - optional bool shouldClaim = 5; - required bytes topic = 6; - //any authentication stuff and other general stuff here - - - /* one entry for each type of request */ - optional PublishRequest publishRequest = 52; - optional SubscribeRequest subscribeRequest = 53; - optional ConsumeRequest consumeRequest = 54; - optional UnsubscribeRequest unsubscribeRequest = 55; - optional StopDeliveryRequest stopDeliveryRequest = 56; - optional StartDeliveryRequest startDeliveryRequest = 57; - optional CloseSubscriptionRequest closeSubscriptionRequest = 58; -} - - - -message PublishRequest{ - required Message msg = 2; -} - -// record all preferences for a subscription, -// would be serialized to be stored in meta store -message SubscriptionPreferences { - // user customized subscription options - optional Map options = 1; - - /// - /// system defined options - /// - - // message bound - optional uint32 messageBound = 2; - // server-side message filter - optional string messageFilter = 3; - // message window size, this is the maximum number of messages - // which will be delivered without being consumed - optional uint32 messageWindowSize = 4; -} - -message SubscribeRequest{ - required bytes subscriberId = 2; - - enum CreateOrAttach{ - CREATE = 0; - ATTACH = 1; - CREATE_OR_ATTACH = 2; - }; - optional CreateOrAttach createOrAttach = 3 [default = CREATE_OR_ATTACH]; - - // wait for cross-regional subscriptions to be established before returning - optional bool synchronous = 4 [default = false]; - // @Deprecated. set message bound in SubscriptionPreferences - optional uint32 messageBound = 5; - - // subscription options - optional SubscriptionPreferences preferences = 6; - - // force attach subscription which would kill existed channel - // this option doesn't need to be persisted - optional bool forceAttach = 7 [default = false]; -} - -// used in client only -// options are stored in SubscriptionPreferences structure -message SubscriptionOptions { - // force attach subscription which would kill existed channel - // this option doesn't need to be persisted - optional bool forceAttach = 1 [default = false]; - optional SubscribeRequest.CreateOrAttach createOrAttach = 2 [default = CREATE_OR_ATTACH]; - optional uint32 messageBound = 3 [default = 0]; - // user customized subscription options - optional Map options = 4; - // server-side message filter - optional string messageFilter = 5; - // message window size, this is the maximum number of messages - // which will be delivered without being consumed - optional uint32 messageWindowSize = 6; - // enable resubscribe - optional bool enableResubscribe = 7 [default = true]; -} - -message ConsumeRequest{ - required bytes subscriberId = 2; - required MessageSeqId msgId = 3; - //the msgId is cumulative: all messages up to this id are marked as consumed -} - -message UnsubscribeRequest{ - required bytes subscriberId = 2; -} - -message CloseSubscriptionRequest { - required bytes subscriberId = 2; -} - -message StopDeliveryRequest{ - required bytes subscriberId = 2; -} - -message StartDeliveryRequest{ - required bytes subscriberId = 2; -} - -// Identify an event happened for a subscription -enum SubscriptionEvent { - // topic has changed ownership (hub server down or topic released) - TOPIC_MOVED = 1; - // subscription is force closed by other subscribers - SUBSCRIPTION_FORCED_CLOSED = 2; -} - -// a response carries an event for a subscription sent to client -message SubscriptionEventResponse { - optional SubscriptionEvent event = 1; -} - -message PubSubResponse{ - required ProtocolVersion protocolVersion = 1; - required StatusCode statusCode = 2; - required uint64 txnId = 3; - - optional string statusMsg = 4; - //in case of a status code of NOT_RESPONSIBLE_FOR_TOPIC, the status - //message will contain the name of the host actually responsible - //for the topic - - //the following fields are sent in delivered messages - optional Message message = 5; - optional bytes topic = 6; - optional bytes subscriberId = 7; - - // the following fields are sent by other requests - optional ResponseBody responseBody = 8; -} - -message PublishResponse { - // If the request was a publish request, this was the message Id of the published message. - required MessageSeqId publishedMsgId = 1; -} - -message SubscribeResponse { - optional SubscriptionPreferences preferences = 2; -} - -message ResponseBody { - optional PublishResponse publishResponse = 1; - optional SubscribeResponse subscribeResponse = 2; - optional SubscriptionEventResponse subscriptionEvent = 3; -} - - -enum StatusCode{ - SUCCESS = 0; - - //client-side errors (4xx) - MALFORMED_REQUEST = 401; - NO_SUCH_TOPIC = 402; - CLIENT_ALREADY_SUBSCRIBED = 403; - CLIENT_NOT_SUBSCRIBED = 404; - COULD_NOT_CONNECT = 405; - TOPIC_BUSY = 406; - RESUBSCRIBE_EXCEPTION = 407; - - //server-side errors (5xx) - NOT_RESPONSIBLE_FOR_TOPIC = 501; - SERVICE_DOWN = 502; - UNCERTAIN_STATE = 503; - INVALID_MESSAGE_FILTER = 504; - - //server-side meta manager errors (52x) - BAD_VERSION = 520; - NO_TOPIC_PERSISTENCE_INFO = 521; - TOPIC_PERSISTENCE_INFO_EXISTS = 522; - NO_SUBSCRIPTION_STATE = 523; - SUBSCRIPTION_STATE_EXISTS = 524; - NO_TOPIC_OWNER_INFO = 525; - TOPIC_OWNER_INFO_EXISTS = 526; - - //For all unexpected error conditions - UNEXPECTED_CONDITION = 600; - - COMPOSITE = 700; -} - -//What follows is not the server client protocol, but server-internal structures that are serialized in ZK -//They should eventually be moved into the server - -message SubscriptionState { - required MessageSeqId msgId = 1; - // @Deprecated. - // It is a bad idea to put fields that don't change frequently - // together with fields that change frequently - // so move it to subscription preferences structure - optional uint32 messageBound = 2; -} - -message SubscriptionData { - optional SubscriptionState state = 1; - optional SubscriptionPreferences preferences = 2; -} - -message LedgerRange{ - required uint64 ledgerId = 1; - optional MessageSeqId endSeqIdIncluded = 2; - optional uint64 startSeqIdIncluded = 3; -} - -message LedgerRanges{ - repeated LedgerRange ranges = 1; -} - -message ManagerMeta { - required string managerImpl = 2; - required uint32 managerVersion = 3; -} - -message HubInfoData { - required string hostname = 2; - required uint64 czxid = 3; -} - -message HubLoadData { - required uint64 numTopics = 2; -} http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-protocol/src/main/resources/findbugsExclude.xml ---------------------------------------------------------------------- diff --git a/hedwig-protocol/src/main/resources/findbugsExclude.xml b/hedwig-protocol/src/main/resources/findbugsExclude.xml deleted file mode 100644 index 27cd339..0000000 --- a/hedwig-protocol/src/main/resources/findbugsExclude.xml +++ /dev/null @@ -1,23 +0,0 @@ -<!-- - Licensed to the Apache Software Foundation (ASF) under one - or more contributor license agreements. See the NOTICE file - distributed with this work for additional information - regarding copyright ownership. The ASF licenses this file - to you under the Apache License, Version 2.0 (the - "License"); you may not use this file except in compliance - with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. -//--> -<FindBugsFilter> - <Match> - <!-- generated code, we can't be held responsible for findbugs in it //--> - <Class name="~org\.apache\.hedwig\.protocol\.PubSubProtocol.*" /> - </Match> -</FindBugsFilter> http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/bin/hedwig ---------------------------------------------------------------------- diff --git a/hedwig-server/bin/hedwig b/hedwig-server/bin/hedwig deleted file mode 100755 index a2ff83b..0000000 --- a/hedwig-server/bin/hedwig +++ /dev/null @@ -1,205 +0,0 @@ -#!/usr/bin/env bash -# -#/** -# * Copyright 2007 The Apache Software Foundation -# * -# * Licensed to the Apache Software Foundation (ASF) under one -# * or more contributor license agreements. See the NOTICE file -# * distributed with this work for additional information -# * regarding copyright ownership. The ASF licenses this file -# * to you under the Apache License, Version 2.0 (the -# * "License"); you may not use this file except in compliance -# * with the License. You may obtain a copy of the License at -# * -# * http://www.apache.org/licenses/LICENSE-2.0 -# * -# * Unless required by applicable law or agreed to in writing, software -# * distributed under the License is distributed on an "AS IS" BASIS, -# * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# * See the License for the specific language governing permissions and -# * limitations under the License. -# */ - -# check if net.ipv6.bindv6only is set to 1 -bindv6only=$(/sbin/sysctl -n net.ipv6.bindv6only 2> /dev/null) -if [ -n "$bindv6only" ] && [ "$bindv6only" -eq "1" ] -then - echo "Error: \"net.ipv6.bindv6only\" is set to 1 - Java networking could be broken" - echo "For more info (the following page also applies to hedwig): http://wiki.apache.org/hadoop/HadoopIPv6" - exit 1 -fi - -# See the following page for extensive details on setting -# up the JVM to accept JMX remote management: -# http://java.sun.com/javase/6/docs/technotes/guides/management/agent.html -# by default we allow local JMX connections -if [ "x$JMXLOCALONLY" = "x" ] -then - JMXLOCALONLY=false -fi - -if [ "x$JMXDISABLE" = "x" ] -then - echo "JMX enabled by default" >&2 - # for some reason these two options are necessary on jdk6 on Ubuntu - # accord to the docs they are not necessary, but otw jconsole cannot - # do a local attach - JMX_ARGS="-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.local.only=$JMXLOCALONLY" -else - echo "JMX disabled by user request" >&2 -fi - -BINDIR=`dirname "$0"` -HW_HOME=`cd $BINDIR/..;pwd` - -DEFAULT_CONF=$HW_HOME/conf/hw_server.conf -DEFAULT_REGION_CLIENT_CONF=$HW_HOME/conf/hw_region_client.conf -DEFAULT_LOG_CONF=$HW_HOME/conf/log4j.properties - -. $HW_HOME/conf/hwenv.sh - -# Check for the java to use -if [[ -z $JAVA_HOME ]]; then - JAVA=$(which java) - if [ $? = 0 ]; then - echo "JAVA_HOME not set, using java from PATH. ($JAVA)" - else - echo "Error: JAVA_HOME not set, and no java executable found in $PATH." 1>&2 - exit 1 - fi -else - JAVA=$JAVA_HOME/bin/java -fi - -RELEASE_JAR=`ls $HW_HOME/hedwig-server-*.jar 2> /dev/null | grep -v tests | tail -1` - -if [ $? == 0 ]; then - HEDWIG_JAR=$RELEASE_JAR -fi - -BUILT_JAR=`ls $HW_HOME/target/hedwig-server-*.jar 2> /dev/null | grep -v tests | tail -1` -if [ $? != 0 ] && [ ! -e "$HEDWIG_JAR" ]; then - echo "\nCouldn't find hedwig jar."; - echo "Make sure you've run 'mvn package'\n"; - exit 1; -elif [ -e "$BUILT_JAR" ]; then - HEDWIG_JAR=$BUILT_JAR -fi - -add_maven_deps_to_classpath() { - MVN="mvn" - if [ "$MAVEN_HOME" != "" ]; then - MVN=${MAVEN_HOME}/bin/mvn - fi - - # Need to generate classpath from maven pom. This is costly so generate it - # and cache it. Save the file into our target dir so a mvn clean will get - # clean it up and force us create a new one. - f="${HW_HOME}/target/cached_classpath.txt" - if [ ! -f "${f}" ] - then - ${MVN} -f "${HW_HOME}/pom.xml" dependency:build-classpath -Dmdep.outputFile="${f}" &> /dev/null - fi - HEDWIG_CLASSPATH=${CLASSPATH}:`cat "${f}"` -} - -if [ -d "$HW_HOME/lib" ]; then - for i in $HW_HOME/lib/*.jar; do - HEDWIG_CLASSPATH=$HEDWIG_CLASSPATH:$i - done -else - add_maven_deps_to_classpath -fi - -hedwig_help() { - cat <<EOF -Usage: hedwig <command> -where command is one of: - server Run the hedwig server - console Run the hedwig admin console - help This help message - -or command is the full name of a class with a defined main() method. - -Environment variables: - HEDWIG_SERVER_CONF Hedwig server configuration file (default $DEFAULT_CONF) - HEDWIG_REGION_CLIENT_CONF Configuration file for the hedwig client used by the - region manager (default $DEFAULT_REGION_CLIENT_CONF) - HEDWIG_CONSOLE_SERVER_CONF Server part configuration for hedwig console, - used for metadata management (defaults to HEDWIG_SERVER_CONF) - HEDWIG_CONSOLE_CLIENT_CONF Client part configuration for hedwig console, - used for interacting with hub server. - HEDWIG_LOG_CONF Log4j configuration file (default $DEFAULT_LOG_CONF) - HEDWIG_ROOT_LOGGER Root logger for hedwig - HEDWIG_LOG_DIR Log directory to store log files for hedwig server - HEDWIG_LOG_FILE Log file name - HEDWIG_EXTRA_OPTS Extra options to be passed to the jvm - -These variable can also be set in conf/hwenv.sh -EOF -} - -# if no args specified, show usage -if [ $# = 0 ]; then - hedwig_help; - exit 1; -fi - -# get arguments -COMMAND=$1 -shift - -if [ -z "$HEDWIG_SERVER_CONF" ]; then - HEDWIG_SERVER_CONF=$DEFAULT_CONF; -fi - -if [ -z "$HEDWIG_REGION_CLIENT_CONF" ]; then - HEDWIG_REGION_CLIENT_CONF=$DEFAULT_REGION_CLIENT_CONF; -fi - -if [ -z "$HEDWIG_LOG_CONF" ]; then - HEDWIG_LOG_CONF=$DEFAULT_LOG_CONF -fi - -HEDWIG_CLASSPATH="$HEDWIG_JAR:$HEDWIG_CLASSPATH" - -if [ "$HEDWIG_LOG_CONF" != "" ]; then - HEDWIG_CLASSPATH="`dirname $HEDWIG_LOG_CONF`:$HEDWIG_CLASSPATH" - OPTS="$OPTS -Dlog4j.configuration=`basename $HEDWIG_LOG_CONF`" -fi -OPTS="-cp $HEDWIG_CLASSPATH $OPTS $HEDWIG_EXTRA_OPTS" - -# Disable ipv6 as it can cause issues -OPTS="$OPTS -Djava.net.preferIPv4Stack=true" - -# log directory & file -HEDWIG_ROOT_LOGGER=${HEDWIG_ROOT_LOGGER:-"INFO,CONSOLE"} -HEDWIG_LOG_DIR=${HEDWIG_LOG_DIR:-"$HW_HOME/logs"} -HEDWIG_LOG_FILE=${HEDWIG_LOG_FILE:-"hedwig-server.log"} - -# Configure log configuration system properties -OPTS="$OPTS -Dhedwig.root.logger=$HEDWIG_ROOT_LOGGER" -OPTS="$OPTS -Dhedwig.log.dir=$HEDWIG_LOG_DIR" -OPTS="$OPTS -Dhedwig.log.file=$HEDWIG_LOG_FILE" - -# Change to HW_HOME to support relative paths -cd "$BK_HOME" -if [ $COMMAND == "server" ]; then - exec $JAVA $OPTS $JMX_ARGS org.apache.hedwig.server.netty.PubSubServer $HEDWIG_SERVER_CONF $HEDWIG_REGION_CLIENT_CONF $@ -elif [ $COMMAND == "console" ]; then - # hedwig console configuration server part - if [ -z "$HEDWIG_CONSOLE_SERVER_CONF" ]; then - HEDWIG_CONSOLE_SERVER_CONF=$HEDWIG_SERVER_CONF - fi - # hedwig console configuration client part - if [ -n "$HEDWIG_CONSOLE_CLIENT_CONF" ]; then - HEDWIG_CONSOLE_CLIENT_OPTIONS="-client-cfg $HEDWIG_CONSOLE_CLIENT_CONF" - fi - exec $JAVA $OPTS org.apache.hedwig.admin.console.HedwigConsole -server-cfg $HEDWIG_CONSOLE_SERVER_CONF $HEDWIG_CONSOLE_CLIENT_OPTIONS $@ -elif [ $COMMAND == "help" ]; then - hedwig_help; -else - exec $JAVA $OPTS $COMMAND $@ -fi - - http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/bin/hedwig-daemon.sh ---------------------------------------------------------------------- diff --git a/hedwig-server/bin/hedwig-daemon.sh b/hedwig-server/bin/hedwig-daemon.sh deleted file mode 100755 index 73eac6f..0000000 --- a/hedwig-server/bin/hedwig-daemon.sh +++ /dev/null @@ -1,163 +0,0 @@ -#!/usr/bin/env bash -# -#/** -# * Licensed to the Apache Software Foundation (ASF) under one -# * or more contributor license agreements. See the NOTICE file -# * distributed with this work for additional information -# * regarding copyright ownership. The ASF licenses this file -# * to you under the Apache License, Version 2.0 (the -# * "License"); you may not use this file except in compliance -# * with the License. You may obtain a copy of the License at -# * -# * http://www.apache.org/licenses/LICENSE-2.0 -# * -# * Unless required by applicable law or agreed to in writing, software -# * distributed under the License is distributed on an "AS IS" BASIS, -# * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# * See the License for the specific language governing permissions and -# * limitations under the License. -# */ - -usage() { - cat <<EOF -Usage: hedwig-daemon.sh (start|stop) <command> <args...> -where command is one of: - server Run the hedwig server -EOF -} - - -BINDIR=`dirname "$0"` -HEDWIG_HOME=`cd $BINDIR/..;pwd` - -if [ -f $HEDWIG_HOME/conf/hwenv.sh ] -then - . $HEDWIG_HOME/conf/hwenv.sh -fi - -HEDWIG_LOG_DIR=${HEDWIG_LOG_DIR:-"$HEDWIG_HOME/logs"} - -HEDWIG_ROOT_LOGGER=${HEDWIG_ROOT_LOGGER:-'INFO,ROLLINGFILE'} - -HEDWIG_STOP_TIMEOUT=${HEDWIG_STOP_TIMEOUT:-30} - -HEDWIG_PID_DIR=${HEDWIG_PID_DIR:-$HEDWIG_HOME/bin} - -if [ $# -lt 2 ] -then - echo "Error: no enough arguments provided." - usage - exit 1 -fi - -startStop=$1 -shift -command=$1 -shift - -case $command in - (server) - echo "doing $startStop $command ..." - ;; - (*) - echo "Error: unknown service name $command" - usage - exit 1 - ;; -esac - -export HEDWIG_LOG_DIR=$HEDWIG_LOG_DIR -export HEDWIG_ROOT_LOGGER=$HEDWIG_ROOT_LOGGER -export HEDWIG_LOG_FILE=hedwig-$command-$HOSTNAME.log - -pid=$HEDWIG_PID_DIR/hedwig-$command.pid -out=$HEDWIG_LOG_DIR/hedwig-$command-$HOSTNAME.out -logfile=$HEDWIG_LOG_DIR/$HEDWIG_LOG_FILE - -rotate_out_log () -{ - log=$1; - num=5; - if [ -n "$2" ]; then - num=$2 - fi - if [ -f "$log" ]; then # rotate logs - while [ $num -gt 1 ]; do - prev=`expr $num - 1` - [ -f "$log.$prev" ] && mv "$log.$prev" "$log.$num" - num=$prev - done - mv "$log" "$log.$num"; - fi -} - -mkdir -p "$HEDWIG_LOG_DIR" - -case $startStop in - (start) - if [ -f $pid ]; then - if kill -0 `cat $pid` > /dev/null 2>&1; then - echo $command running as process `cat $pid`. Stop it first. - exit 1 - fi - fi - - rotate_out_log $out - echo starting $command, logging to $logfile - hedwig=$HEDWIG_HOME/bin/hedwig - nohup $hedwig $command "$@" > "$out" 2>&1 < /dev/null & - echo $! > $pid - sleep 1; head $out - sleep 2; - if ! ps -p $! > /dev/null ; then - exit 1 - fi - ;; - - (stop) - if [ -f $pid ]; then - TARGET_PID=`cat $pid` - if kill -0 $TARGET_PID > /dev/null 2>&1; then - echo stopping $command - kill $TARGET_PID - - count=0 - location=$HEDWIG_LOG_DIR - while ps -p $TARGET_PID > /dev/null; - do - echo "Shutdown is in progress... Please wait..." - sleep 1 - count=`expr $count + 1` - - if [ "$count" = "$HEDWIG_STOP_TIMEOUT" ]; then - break - fi - done - - if [ "$count" != "$HEDWIG_STOP_TIMEOUT" ]; then - echo "Shutdown completed." - exit 0 - fi - - if kill -0 $TARGET_PID > /dev/null 2>&1; then - fileName=$location/$command.out - $JAVA_HOME/bin/jstack $TARGET_PID > $fileName - echo Thread dumps are taken for analysis at $fileName - echo forcefully stopping $command - kill -9 $TARGET_PID >/dev/null 2>&1 - echo Successfully stopped the process - fi - else - echo no $command to stop - fi - rm $pid - else - echo no $command to stop - fi - ;; - - (*) - usage - exit 1 - ;; -esac http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/conf/hw_region_client.conf ---------------------------------------------------------------------- diff --git a/hedwig-server/conf/hw_region_client.conf b/hedwig-server/conf/hw_region_client.conf deleted file mode 100644 index 9a5592e..0000000 --- a/hedwig-server/conf/hw_region_client.conf +++ /dev/null @@ -1,42 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -# This is the configuration file for the hedwig client used by the region manager - -# This parameter is a boolean flag indicating if communication with the -# server should be done via SSL for encryption. The Hedwig server hubs also -# need to be SSL enabled for this to work. -# ssl_enabled=false - -# The maximum message size in bytes -# max_message_size=2097152 - -# The maximum number of redirects we permit before signalling an error -# max_server_redirects=2 - -# A flag indicating whether the client library should automatically send -# consume messages to the server -# auto_send_consume_message_enabled=true - -# The number of messages we buffer before sending a consume message -# to the server -# consumed_messages_buffer_size=5 - -# Support for client side throttling. -# max_outstanding_messages=10 - -# The timeout in milliseconds before we error out any existing -# requests -# server_ack_response_timeout=30000 http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/conf/hw_server.conf ---------------------------------------------------------------------- diff --git a/hedwig-server/conf/hw_server.conf b/hedwig-server/conf/hw_server.conf deleted file mode 100644 index 2ca2d54..0000000 --- a/hedwig-server/conf/hw_server.conf +++ /dev/null @@ -1,168 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -################################ -# ZooKeeper Settings -################################ - -# The ZooKeeper server host(s) for the Hedwig Server to use. -zk_host=localhost:2181 - -# The number of milliseconds of each tick in ZooKeeper. -zk_timeout=2000 - -################################ -# Hub Server Settings -################################ - -# Is the hub server running in standalone mode? -# Default is false. -standalone=false - -# The port at which the clients will connect. -server_port=4080 - -# The SSL port at which the clients will connect (only if SSL is enabled). -ssl_server_port=9876 - -# Flag indicating if the server should also operate in SSL mode. -ssl_enabled=false - -# Name of the SSL certificate if available as a resource. -# The certificate should be in pkcs12 format. -# cert_name= - -# Path to the SSL certificate if available as a file. -# The certificate should be in pkcs12 format. -# cert_path= - -# Password used for pkcs12 certificate. -# password= - -####################################### -# Publish and subscription parameters -####################################### -# Max Message Size that a hub server could accept -# max_message_size=1258291 - -# Message Sequence Interval to update subscription state to metadata store. -# Default is 50. -# consume_interval=50 - -# Time interval (in seconds) to release topic ownership. If the time interval -# is less than zero, the ownership will never be released automatically. -# Default is 0. -# retention_secs=0 - -# Time interval (in milliseconds) to run messages consumed timer task to -# delete those consumed ledgers in BookKeeper. -# messages_consumed_thread_run_interval=60000 - -# Default maximum number of messages which can be delivered to a subscriber -# without being consumed. We pause messages delivery to a subscriber when -# reaching the window size. Default is 0, which means we never pause messages -# delivery even a subscriber consumes nothing and it doesn't set any subscriber -# specified message window size. -# default_message_window_size=0 - -# The maximum number of entries stored in a ledger. When the number of entries -# reaches this threshold, hub server will open a new ledger to write. Default is 0. -# If it was set to 0, hub server will keep using same ledger to write entries unless -# the topic ownership changed. -# max_entries_per_ledger=0 - -################################ -# Region Related Settings -################################ - -# Region name that the hub server belongs to. -# region=standalone - -# Regions list of a Hedwig instance. -# The expected format for the regions parameter is Hostname:Port:SSLPort -# with spaces in between each of regions. -# regions= - -# Enabled ssl connections between regions or not. -# (@Deprecated here. It is recommended to set in conf/hw_region_client.conf) -# Default is false. -# inter_region_ssl_enabled=false - -# Time interval (in milliseconds) to run thread to retry those failed -# remote subscriptions in asynchronous mode. Default is 120000. -# retry_remote_subscribe_thread_run_interval=120000 - -################################ -# ReadAhead Settings -################################ - -# Enable read ahead cache or not. If disabled, read requests -# would access BookKeeper directly. -# Default is true. -# readahead_enabled=true - -# Number of entries to read ahead. Default value is 10. -# readahead_count=10 - -# Max size of entries to read ahead. Default value is 4M. -# readahead_size=4194304 - -# Max memory used for ReadAhead Cache. -# Default value is minimum value of 2G or half of JVM max memory. -# cache_size= - -# The backoff time (in milliseconds) to retry scans after failures. -# Default value is 1000. -# scan_backoff_ms=1000 - -# Sets the number of threads to be used for the read-ahead mechanism. -# Default is the number of cores as returned with a call to -# <code>Runtime.getRuntime().availableProcessors()</code>. -# num_readahead_cache_threads= - -# Set TTL for cache entries. Each time adding new entry into the cache, -# those expired cache entries would be discarded. If the value is set -# to zero or less than zero, cache entry will not be evicted until the -# cache is fullfilled or the messages are already consumed. By default -# the value is zero. -# cache_entry_ttl= - -################################ -# Metadata Settings -################################ - -# zookeeper prefix to store metadata if using zookeeper as metadata store. -# Default value is "/hedwig". -# zk_prefix=/hedwig - -# Enable metadata manager based topic manager. Default is false. -# metadata_manager_based_topic_manager_enabled=false - -# Class name of metadata manager factory used to store metadata. -# Default is null. -# metadata_manager_factory_class= - -################################ -# BookKeeper Settings -################################ - -# Ensemble size of a ledger in BookKeeper. Default is 3. -# bk_ensemble_size=3 - -# Write quorum size for a ledger in BookKeeper. Default is 2. -# bk_write_quorum_size=2 - -# Ack quorum size for a ledger in BookKeeper. Default is 2. -# bk_ack_quorum_size=2 http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/conf/hwenv.sh ---------------------------------------------------------------------- diff --git a/hedwig-server/conf/hwenv.sh b/hedwig-server/conf/hwenv.sh deleted file mode 100644 index 8d379b6..0000000 --- a/hedwig-server/conf/hwenv.sh +++ /dev/null @@ -1,56 +0,0 @@ -#!/bin/sh -# -#/** -# * Copyright 2007 The Apache Software Foundation -# * -# * Licensed to the Apache Software Foundation (ASF) under one -# * or more contributor license agreements. See the NOTICE file -# * distributed with this work for additional information -# * regarding copyright ownership. The ASF licenses this file -# * to you under the Apache License, Version 2.0 (the -# * "License"); you may not use this file except in compliance -# * with the License. You may obtain a copy of the License at -# * -# * http://www.apache.org/licenses/LICENSE-2.0 -# * -# * Unless required by applicable law or agreed to in writing, software -# * distributed under the License is distributed on an "AS IS" BASIS, -# * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# * See the License for the specific language governing permissions and -# * limitations under the License. -# */ - -# Set JAVA_HOME here to override the environment setting -# JAVA_HOME= - -# default settings for starting hedwig -# HEDWIG_SERVER_CONF= - -# default settings for the region manager's hedwig client -# HEDWIG_REGION_CLIENT_CONF= - -# default settings for the region manager's hedwig client -# HEDWIG_CLIENT_CONF= - -# Server part configuration for hedwig console, -# used for metadata management -# HEDWIG_CONSOLE_SERVER_CONF= - -# Client part configuration for hedwig console, -# used for interacting with hub server. -# HEDWIG_CONSOLE_CLIENT_CONF= - -# Log4j configuration file -# HEDWIG_LOG_CONF= - -# Logs location -# HEDWIG_LOG_DIR= - -# Extra options to be passed to the jvm -# HEDWIG_EXTRA_OPTS= - -#Folder where the hedwig server PID file should be stored -#HEDWIG_PID_DIR= - -#Wait time before forcefully kill the hedwig server instance, if the stop is not successful -#HEDWIG_STOP_TIMEOUT= http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/conf/log4j.properties ---------------------------------------------------------------------- diff --git a/hedwig-server/conf/log4j.properties b/hedwig-server/conf/log4j.properties deleted file mode 100644 index c0f1c49..0000000 --- a/hedwig-server/conf/log4j.properties +++ /dev/null @@ -1,78 +0,0 @@ -# -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# -# - -# -# Hedwig Logging Configuration -# - -# Format is "<default threshold> (, <appender>)+ - -# DEFAULT: console appender only -# Define some default values that can be overridden by system properties -hedwig.root.logger=WARN,CONSOLE -hedwig.log.dir=. -hedwig.log.file=hedwig-server.log -hedwig.trace.file=hedwig-trace.log - -log4j.rootLogger=${hedwig.root.logger} - -# Example with rolling log file -#log4j.rootLogger=DEBUG, CONSOLE, ROLLINGFILE - -# Example with rolling log file and tracing -#log4j.rootLogger=TRACE, CONSOLE, ROLLINGFILE, TRACEFILE - -# -# Log INFO level and above messages to the console -# -log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender -log4j.appender.CONSOLE.Threshold=INFO -log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout -log4j.appender.CONSOLE.layout.ConversionPattern=%d{ISO8601} - %-5p - [%t:%C{1}@%L] - %m%n - -# -# Add ROLLINGFILE to rootLogger to get log file output -# Log DEBUG level and above messages to a log file -log4j.appender.ROLLINGFILE=org.apache.log4j.DailyRollingFileAppender -log4j.appender.ROLLINGFILE.Threshold=INFO -log4j.appender.ROLLINGFILE.File=${hedwig.log.dir}/${hedwig.log.file} -log4j.appender.ROLLINGFILE.layout=org.apache.log4j.PatternLayout -log4j.appender.ROLLINGFILE.layout.ConversionPattern=%d{ISO8601} - %-5p - [%t:%C{1}@%L] - %m%n - -# Max log file size of 10MB -#log4j.appender.ROLLINGFILE.MaxFileSize=10MB -# uncomment the next line to limit number of backup files -#log4j.appender.ROLLINGFILE.MaxBackupIndex=10 - -log4j.appender.ROLLINGFILE.layout=org.apache.log4j.PatternLayout -log4j.appender.ROLLINGFILE.layout.ConversionPattern=%d{ISO8601} - %-5p [%t:%C{1}@%L] - %m%n - - -# -# Add TRACEFILE to rootLogger to get log file output -# Log DEBUG level and above messages to a log file -log4j.appender.TRACEFILE=org.apache.log4j.FileAppender -log4j.appender.TRACEFILE.Threshold=TRACE -log4j.appender.TRACEFILE.File=${hedwig.log.dir}/${hedwig.trace.file} - -log4j.appender.TRACEFILE.layout=org.apache.log4j.PatternLayout -### Notice we are including log4j's NDC here (%x) -log4j.appender.TRACEFILE.layout.ConversionPattern=%d{ISO8601} - %-5p [%t:%C{1}@%L][%x] - %m%n http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/pom.xml ---------------------------------------------------------------------- diff --git a/hedwig-server/pom.xml b/hedwig-server/pom.xml deleted file mode 100644 index b460f25..0000000 --- a/hedwig-server/pom.xml +++ /dev/null @@ -1,294 +0,0 @@ -<?xml version="1.0"?> -<!-- - Licensed to the Apache Software Foundation (ASF) under one or more - contributor license agreements. See the NOTICE file distributed with - this work for additional information regarding copyright ownership. - The ASF licenses this file to You under the Apache License, Version 2.0 - (the "License"); you may not use this file except in compliance with - the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. ---> - -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" - xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> - <modelVersion>4.0.0</modelVersion> - <parent> - <groupId>org.apache.bookkeeper</groupId> - <artifactId>bookkeeper</artifactId> - <version>4.4.0-SNAPSHOT</version> - </parent> - <properties> - <mainclass>org.apache.hedwig.server.netty.PubSubServer</mainclass> - <project.libdir>${basedir}/lib</project.libdir> - </properties> - <artifactId>hedwig-server</artifactId> - <packaging>jar</packaging> - <name>hedwig-server</name> - <url>http://maven.apache.org</url> - <dependencies> - <dependency> - <groupId>junit</groupId> - <artifactId>junit</artifactId> - <version>4.8.1</version> - <scope>test</scope> - </dependency> - <dependency> - <groupId>org.slf4j</groupId> - <artifactId>slf4j-api</artifactId> - <version>1.6.4</version> - </dependency> - <dependency> - <groupId>org.slf4j</groupId> - <artifactId>slf4j-log4j12</artifactId> - <version>1.6.4</version> - </dependency> - <dependency> - <groupId>org.apache.bookkeeper</groupId> - <artifactId>hedwig-client</artifactId> - <version>${project.parent.version}</version> - <scope>compile</scope> - <type>jar</type> - </dependency> - <dependency> - <groupId>org.apache.derby</groupId> - <artifactId>derby</artifactId> - <version>10.8.2.2</version> - <scope>runtime</scope> - </dependency> - <dependency> - <groupId>org.apache.zookeeper</groupId> - <artifactId>zookeeper</artifactId> - <version>${zookeeper.version}</version> - <scope>compile</scope> - </dependency> - <dependency> - <groupId>org.apache.zookeeper</groupId> - <artifactId>zookeeper</artifactId> - <version>${zookeeper.version}</version> - <type>test-jar</type> - <scope>test</scope> - </dependency> - <dependency> - <groupId>org.apache.bookkeeper</groupId> - <artifactId>bookkeeper-server</artifactId> - <version>${project.parent.version}</version> - <scope>compile</scope> - <type>jar</type> - </dependency> - <dependency> - <groupId>org.apache.bookkeeper</groupId> - <artifactId>bookkeeper-server</artifactId> - <version>${project.parent.version}</version> - <scope>test</scope> - <type>test-jar</type> - </dependency> - <!-- - Annoying dependency we need to include because - zookeeper uses log4j and so we transatively do, but - log4j has some dependencies which aren't in the - default maven repositories - //--> - <dependency> - <groupId>log4j</groupId> - <artifactId>log4j</artifactId> - <version>1.2.15</version> - <exclusions> - <exclusion> - <groupId>javax.mail</groupId> - <artifactId>mail</artifactId> - </exclusion> - <exclusion> - <groupId>javax.jms</groupId> - <artifactId>jms</artifactId> - </exclusion> - <exclusion> - <groupId>com.sun.jdmk</groupId> - <artifactId>jmxtools</artifactId> - </exclusion> - <exclusion> - <groupId>com.sun.jmx</groupId> - <artifactId>jmxri</artifactId> - </exclusion> - </exclusions> - </dependency> - <dependency> - <groupId>jline</groupId> - <artifactId>jline</artifactId> - <version>0.9.94</version> - </dependency> - <dependency> - <groupId>org.apache.bookkeeper</groupId> - <artifactId>hedwig-server-compat420</artifactId> - <version>4.2.0</version> - <scope>test</scope> - <exclusions> - <exclusion> - <groupId>org.apache.bookkeeper</groupId> - <artifactId>bookkeeper-server</artifactId> - </exclusion> - <exclusion> - <groupId>org.apache.bookkeeper</groupId> - <artifactId>hedwig-server</artifactId> - </exclusion> - <exclusion> - <groupId>org.apache.bookkeeper</groupId> - <artifactId>hedwig-protocol</artifactId> - </exclusion> - <exclusion> - <groupId>org.apache.bookkeeper</groupId> - <artifactId>hedwig-client</artifactId> - </exclusion> - </exclusions> - </dependency> - <dependency> - <groupId>org.apache.bookkeeper</groupId> - <artifactId>hedwig-server-compat410</artifactId> - <version>4.1.0</version> - <scope>test</scope> - <exclusions> - <exclusion> - <groupId>org.apache.bookkeeper</groupId> - <artifactId>bookkeeper-server</artifactId> - </exclusion> - <exclusion> - <groupId>org.apache.bookkeeper</groupId> - <artifactId>hedwig-server</artifactId> - </exclusion> - <exclusion> - <groupId>org.apache.bookkeeper</groupId> - <artifactId>hedwig-protocol</artifactId> - </exclusion> - <exclusion> - <groupId>org.apache.bookkeeper</groupId> - <artifactId>hedwig-client</artifactId> - </exclusion> - </exclusions> - </dependency> - <dependency> - <groupId>org.apache.bookkeeper</groupId> - <artifactId>hedwig-server-compat400</artifactId> - <version>4.0.0</version> - <scope>test</scope> - <exclusions> - <exclusion> - <groupId>org.apache.bookkeeper</groupId> - <artifactId>bookkeeper-server</artifactId> - </exclusion> - <exclusion> - <groupId>org.apache.bookkeeper</groupId> - <artifactId>hedwig-server</artifactId> - </exclusion> - <exclusion> - <groupId>org.apache.bookkeeper</groupId> - <artifactId>hedwig-protocol</artifactId> - </exclusion> - <exclusion> - <groupId>org.apache.bookkeeper</groupId> - <artifactId>hedwig-client</artifactId> - </exclusion> - </exclusions> - </dependency> - </dependencies> - <build> - <plugins> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-jar-plugin</artifactId> - <executions> - <execution> - <phase>package</phase> - <goals> - <goal>test-jar</goal> - </goals> - </execution> - </executions> - </plugin> - <plugin> - <groupId>org.apache.rat</groupId> - <artifactId>apache-rat-plugin</artifactId> - <version>0.7</version> - <configuration> - <excludes> - <exclude>**/p12.pass</exclude> - </excludes> - </configuration> - </plugin> - <plugin> - <artifactId>maven-assembly-plugin</artifactId> - <version>2.2.1</version> - <configuration> - <descriptors> - <descriptor>../src/assemble/bin.xml</descriptor> - </descriptors> - </configuration> - </plugin> - <plugin> - <groupId>org.codehaus.mojo</groupId> - <artifactId>findbugs-maven-plugin</artifactId> - <configuration> - <excludeFilterFile>${basedir}/src/main/resources/findbugsExclude.xml</excludeFilterFile> - </configuration> - </plugin> - <plugin> - <artifactId>maven-antrun-plugin</artifactId> - <executions> - <execution> - <id>createbuilddir</id> - <phase>generate-test-resources</phase> - <configuration> - <target> - <mkdir dir="target/zk_clientbase_build" /> - </target> - </configuration> - <goals> - <goal>run</goal> - </goals> - </execution> - </executions> - </plugin> - <plugin> - <artifactId>maven-dependency-plugin</artifactId> - <executions> - <execution> - <phase>package</phase> - <goals> - <goal>copy-dependencies</goal> - </goals> - <configuration> - <outputDirectory>${project.libdir}</outputDirectory> - </configuration> - </execution> - </executions> - </plugin> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-surefire-plugin</artifactId> - <configuration> - <systemPropertyVariables> - <derby.stream.error.file>target/derby.log</derby.stream.error.file> - <build.test.dir>target/zk_clientbase_build</build.test.dir> - </systemPropertyVariables> - </configuration> - </plugin> - <plugin> - <artifactId>maven-clean-plugin</artifactId> - <version>2.5</version> - <configuration> - <filesets> - <fileset> - <directory>${project.libdir}</directory> - <followSymlinks>false</followSymlinks> - </fileset> - </filesets> - </configuration> - </plugin> - </plugins> - </build> -</project> http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/main/java/org/apache/hedwig/admin/HedwigAdmin.java ---------------------------------------------------------------------- diff --git a/hedwig-server/src/main/java/org/apache/hedwig/admin/HedwigAdmin.java b/hedwig-server/src/main/java/org/apache/hedwig/admin/HedwigAdmin.java deleted file mode 100644 index ec38fc2..0000000 --- a/hedwig-server/src/main/java/org/apache/hedwig/admin/HedwigAdmin.java +++ /dev/null @@ -1,547 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hedwig.admin; - -import java.util.Arrays; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; - -import org.apache.bookkeeper.conf.ClientConfiguration; -import org.apache.bookkeeper.client.BKException; -import org.apache.bookkeeper.client.BookKeeper; -import org.apache.bookkeeper.client.BookKeeper.DigestType; -import org.apache.bookkeeper.client.LedgerHandle; -import org.apache.bookkeeper.versioning.Versioned; -import org.apache.hedwig.exceptions.PubSubException; -import org.apache.hedwig.protocol.PubSubProtocol.LedgerRange; -import org.apache.hedwig.protocol.PubSubProtocol.LedgerRanges; -import org.apache.hedwig.protocol.PubSubProtocol.MessageSeqId; -import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionData; -import org.apache.hedwig.server.common.ServerConfiguration; -import org.apache.hedwig.server.meta.MetadataManagerFactory; -import org.apache.hedwig.server.meta.FactoryLayout; -import org.apache.hedwig.server.meta.SubscriptionDataManager; -import org.apache.hedwig.server.meta.TopicOwnershipManager; -import org.apache.hedwig.server.meta.TopicPersistenceManager; -import org.apache.hedwig.server.topics.HubInfo; -import org.apache.hedwig.server.topics.HubLoad; -import org.apache.hedwig.util.Callback; -import org.apache.hedwig.util.HedwigSocketAddress; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.apache.zookeeper.KeeperException; -import org.apache.zookeeper.WatchedEvent; -import org.apache.zookeeper.Watcher; -import org.apache.zookeeper.ZooKeeper; -import org.apache.zookeeper.data.Stat; - -import com.google.protobuf.ByteString; -import static com.google.common.base.Charsets.UTF_8; - -/** - * Hedwig Admin - */ -public class HedwigAdmin { - private static final Logger LOG = LoggerFactory.getLogger(HedwigAdmin.class); - - // NOTE: now it is fixed passwd used in hedwig - static byte[] passwd = "sillysecret".getBytes(UTF_8); - - protected final ZooKeeper zk; - protected final BookKeeper bk; - protected final MetadataManagerFactory mmFactory; - protected final SubscriptionDataManager sdm; - protected final TopicOwnershipManager tom; - protected final TopicPersistenceManager tpm; - - // hub configurations - protected final ServerConfiguration serverConf; - // bookkeeper configurations - protected final ClientConfiguration bkClientConf; - - protected final CountDownLatch zkReadyLatch = new CountDownLatch(1); - - // Empty watcher - private class MyWatcher implements Watcher { - public void process(WatchedEvent event) { - if (Event.KeeperState.SyncConnected.equals(event.getState())) { - zkReadyLatch.countDown(); - } - } - } - - static class SyncObj<T> { - boolean finished = false; - boolean success = false; - T value = null; - PubSubException exception = null; - - synchronized void success(T v) { - finished = true; - success = true; - value = v; - notify(); - } - - synchronized void fail(PubSubException pse) { - finished = true; - success = false; - exception = pse; - notify(); - } - - synchronized void block() { - try { - while (!finished) { - wait(); - } - } catch (InterruptedException ie) { - } - } - - synchronized boolean isSuccess() { - return success; - } - } - - /** - * Stats of a hub - */ - public static class HubStats { - HubInfo hubInfo; - HubLoad hubLoad; - - public HubStats(HubInfo info, HubLoad load) { - this.hubInfo = info; - this.hubLoad = load; - } - - @Override - public String toString() { - StringBuilder sb = new StringBuilder(); - sb.append("info : [").append(hubInfo.toString().trim().replaceAll("\n", ", ")) - .append("], load : [").append(hubLoad.toString().trim().replaceAll("\n", ", ")) - .append("]"); - return sb.toString(); - } - } - - /** - * Hedwig Admin Constructor - * - * @param bkConf - * BookKeeper Client Configuration. - * @param hubConf - * Hub Server Configuration. - * @throws Exception - */ - public HedwigAdmin(ClientConfiguration bkConf, ServerConfiguration hubConf) throws Exception { - this.serverConf = hubConf; - this.bkClientConf = bkConf; - - // connect to zookeeper - zk = new ZooKeeper(hubConf.getZkHost(), hubConf.getZkTimeout(), new MyWatcher()); - LOG.debug("Connecting to zookeeper {}, timeout = {}", - hubConf.getZkHost(), hubConf.getZkTimeout()); - // wait until connection is ready - if (!zkReadyLatch.await(hubConf.getZkTimeout() * 2, TimeUnit.MILLISECONDS)) { - throw new Exception("Count not establish connection with ZooKeeper after " + hubConf.getZkTimeout() * 2 + " ms."); - } - - // construct the metadata manager factory - mmFactory = MetadataManagerFactory.newMetadataManagerFactory(hubConf, zk); - tpm = mmFactory.newTopicPersistenceManager(); - tom = mmFactory.newTopicOwnershipManager(); - sdm = mmFactory.newSubscriptionDataManager(); - - // connect to bookkeeper - bk = new BookKeeper(bkClientConf, zk); - LOG.debug("Connecting to bookkeeper"); - } - - /** - * Close the hedwig admin. - * - * @throws Exception - */ - public void close() throws Exception { - tpm.close(); - tom.close(); - sdm.close(); - mmFactory.shutdown(); - bk.close(); - zk.close(); - } - - /** - * Return zookeeper handle used in hedwig admin. - * - * @return zookeeper handle - */ - public ZooKeeper getZkHandle() { - return zk; - } - - /** - * Return bookkeeper handle used in hedwig admin. - * - * @return bookkeeper handle - */ - public BookKeeper getBkHandle() { - return bk; - } - - /** - * Return hub server configuration used in hedwig admin - * - * @return hub server configuration - */ - public ServerConfiguration getHubServerConf() { - return serverConf; - } - - /** - * Return metadata manager factory. - * - * @return metadata manager factory instance. - */ - public MetadataManagerFactory getMetadataManagerFactory() { - return mmFactory; - } - - /** - * Return bookeeper passwd used in hedwig admin - * - * @return bookeeper passwd - */ - public byte[] getBkPasswd() { - return Arrays.copyOf(passwd, passwd.length); - } - - /** - * Return digest type used in hedwig admin - * - * @return bookeeper digest type - */ - public DigestType getBkDigestType() { - return DigestType.CRC32; - } - - /** - * Dose topic exist? - * - * @param topic - * Topic name - * @return whether topic exists or not? - * @throws Exception - */ - public boolean hasTopic(ByteString topic) throws Exception { - // current persistence info is bound with a topic, so if there is persistence info - // there is topic. - final SyncObj<Boolean> syncObj = new SyncObj<Boolean>(); - tpm.readTopicPersistenceInfo(topic, new Callback<Versioned<LedgerRanges>>() { - @Override - public void operationFinished(Object ctx, Versioned<LedgerRanges> result) { - if (null == result) { - syncObj.success(false); - } else { - syncObj.success(true); - } - } - @Override - public void operationFailed(Object ctx, PubSubException pse) { - syncObj.fail(pse); - } - }, syncObj); - - syncObj.block(); - - if (!syncObj.isSuccess()) { - throw syncObj.exception; - } - - return syncObj.value; - } - - /** - * Get available hubs. - * - * @return available hubs and their loads - * @throws Exception - */ - public Map<HedwigSocketAddress, HubStats> getAvailableHubs() throws Exception { - String zkHubsPath = serverConf.getZkHostsPrefix(new StringBuilder()).toString(); - Map<HedwigSocketAddress, HubStats> hubs = - new HashMap<HedwigSocketAddress, HubStats>(); - List<String> hosts = zk.getChildren(zkHubsPath, false); - for (String host : hosts) { - String zkHubPath = serverConf.getZkHostsPrefix(new StringBuilder()) - .append("/").append(host).toString(); - HedwigSocketAddress addr = new HedwigSocketAddress(host); - try { - Stat stat = new Stat(); - byte[] data = zk.getData(zkHubPath, false, stat); - if (data == null) { - continue; - } - HubLoad load = HubLoad.parse(new String(data, UTF_8)); - HubInfo info = new HubInfo(addr, stat.getCzxid()); - hubs.put(addr, new HubStats(info, load)); - } catch (KeeperException ke) { - LOG.warn("Couldn't read hub data from ZooKeeper", ke); - } catch (InterruptedException ie) { - LOG.warn("Interrupted during read", ie); - } - } - return hubs; - } - - /** - * Get list of topics - * - * @return list of topics - * @throws Exception - */ - public Iterator<ByteString> getTopics() throws Exception { - return mmFactory.getTopics(); - } - - /** - * Return the topic owner of a topic - * - * @param topic - * Topic name - * @return the address of the owner of a topic - * @throws Exception - */ - public HubInfo getTopicOwner(ByteString topic) throws Exception { - final SyncObj<HubInfo> syncObj = new SyncObj<HubInfo>(); - tom.readOwnerInfo(topic, new Callback<Versioned<HubInfo>>() { - @Override - public void operationFinished(Object ctx, Versioned<HubInfo> result) { - if (null == result) { - syncObj.success(null); - } else { - syncObj.success(result.getValue()); - } - } - @Override - public void operationFailed(Object ctx, PubSubException pse) { - syncObj.fail(pse); - } - }, syncObj); - - syncObj.block(); - - if (!syncObj.isSuccess()) { - throw syncObj.exception; - } - - return syncObj.value; - } - - private static LedgerRange buildLedgerRange(long ledgerId, long startOfLedger, MessageSeqId endOfLedger) { - LedgerRange.Builder builder = - LedgerRange.newBuilder().setLedgerId(ledgerId).setStartSeqIdIncluded(startOfLedger) - .setEndSeqIdIncluded(endOfLedger); - return builder.build(); - } - - /** - * Return the ledger range forming the topic - * - * @param topic - * Topic name - * @return ledger ranges forming the topic - * @throws Exception - */ - public List<LedgerRange> getTopicLedgers(ByteString topic) throws Exception { - final SyncObj<LedgerRanges> syncObj = new SyncObj<LedgerRanges>(); - tpm.readTopicPersistenceInfo(topic, new Callback<Versioned<LedgerRanges>>() { - @Override - public void operationFinished(Object ctx, Versioned<LedgerRanges> result) { - if (null == result) { - syncObj.success(null); - } else { - syncObj.success(result.getValue()); - } - } - @Override - public void operationFailed(Object ctx, PubSubException pse) { - syncObj.fail(pse); - } - }, syncObj); - - syncObj.block(); - - if (!syncObj.isSuccess()) { - throw syncObj.exception; - } - - LedgerRanges ranges = syncObj.value; - if (null == ranges) { - return null; - } - List<LedgerRange> results = new ArrayList<LedgerRange>(); - List<LedgerRange> lrs = ranges.getRangesList(); - long startSeqId = 1L; - if (!lrs.isEmpty()) { - LedgerRange range = lrs.get(0); - if (!range.hasStartSeqIdIncluded() && range.hasEndSeqIdIncluded()) { - long ledgerId = range.getLedgerId(); - try { - LedgerHandle lh = bk.openLedgerNoRecovery(ledgerId, DigestType.CRC32, passwd); - long numEntries = lh.readLastConfirmed() + 1; - long endOfLedger = range.getEndSeqIdIncluded().getLocalComponent(); - startSeqId = endOfLedger - numEntries + 1; - } catch (BKException.BKNoSuchLedgerExistsException be) { - // ignore it - } - } - } - Iterator<LedgerRange> lrIter = lrs.iterator(); - while (lrIter.hasNext()) { - LedgerRange range = lrIter.next(); - if (range.hasEndSeqIdIncluded()) { - long endOfLedger = range.getEndSeqIdIncluded().getLocalComponent(); - if (range.hasStartSeqIdIncluded()) { - startSeqId = range.getStartSeqIdIncluded(); - } else { - range = buildLedgerRange(range.getLedgerId(), startSeqId, range.getEndSeqIdIncluded()); - } - results.add(range); - if (startSeqId < endOfLedger + 1) { - startSeqId = endOfLedger + 1; - } - continue; - } - if (lrIter.hasNext()) { - throw new IllegalStateException("Ledger " + range.getLedgerId() + " for topic " + topic.toString() - + " is not the last one but still does not have an end seq-id"); - } - - if (range.hasStartSeqIdIncluded()) { - startSeqId = range.getStartSeqIdIncluded(); - } - - LedgerHandle lh = bk.openLedgerNoRecovery(range.getLedgerId(), DigestType.CRC32, passwd); - long endOfLedger = startSeqId + lh.readLastConfirmed(); - MessageSeqId endSeqId = MessageSeqId.newBuilder().setLocalComponent(endOfLedger).build(); - results.add(buildLedgerRange(range.getLedgerId(), startSeqId, endSeqId)); - } - return results; - } - - /** - * Return subscriptions of a topic - * - * @param topic - * Topic name - * @return subscriptions of a topic - * @throws Exception - */ - public Map<ByteString, SubscriptionData> getTopicSubscriptions(ByteString topic) - throws Exception { - - final SyncObj<Map<ByteString, SubscriptionData>> syncObj = - new SyncObj<Map<ByteString, SubscriptionData>>(); - sdm.readSubscriptions(topic, new Callback<Map<ByteString, Versioned<SubscriptionData>>>() { - @Override - public void operationFinished(Object ctx, Map<ByteString, Versioned<SubscriptionData>> result) { - // It was just used to console tool to print some information, so don't need to return version for it - // just keep the getTopicSubscriptions interface as before - Map<ByteString, SubscriptionData> subs = new ConcurrentHashMap<ByteString, SubscriptionData>(); - for (Map.Entry<ByteString, Versioned<SubscriptionData>> subEntry : result.entrySet()) { - subs.put(subEntry.getKey(), subEntry.getValue().getValue()); - } - syncObj.success(subs); - } - @Override - public void operationFailed(Object ctx, PubSubException pse) { - syncObj.fail(pse); - } - }, syncObj); - - syncObj.block(); - - if (!syncObj.isSuccess()) { - throw syncObj.exception; - } - - return syncObj.value; - } - - /** - * Return subscription state of a subscriber of topic - * - * @param topic - * Topic name - * @param subscriber - * Subscriber name - * @return subscription state - * @throws Exception - */ - public SubscriptionData getSubscription(ByteString topic, ByteString subscriber) throws Exception { - final SyncObj<SubscriptionData> syncObj = new SyncObj<SubscriptionData>(); - sdm.readSubscriptionData(topic, subscriber, new Callback<Versioned<SubscriptionData>>() { - @Override - public void operationFinished(Object ctx, Versioned<SubscriptionData> result) { - if (null == result) { - syncObj.success(null); - } else { - syncObj.success(result.getValue()); - } - } - @Override - public void operationFailed(Object ctx, PubSubException pse) { - syncObj.fail(pse); - } - }, syncObj); - - syncObj.block(); - - if (!syncObj.isSuccess()) { - throw syncObj.exception; - } - - return syncObj.value; - } - - /** - * Format metadata for Hedwig. - */ - public void format() throws Exception { - // format metadata first - mmFactory.format(serverConf, zk); - LOG.info("Formatted Hedwig metadata successfully."); - // remove metadata layout - FactoryLayout.deleteLayout(zk, serverConf); - LOG.info("Removed old factory layout."); - // create new metadata manager factory and write new metadata layout - MetadataManagerFactory.createMetadataManagerFactory(serverConf, zk, - serverConf.getMetadataManagerFactoryClass()); - LOG.info("Created new factory layout."); - } -}
