[ROCKETMQ-17] Develop a vendor-neutral open standard for distributed messaging: add messaging relay service standard ASF JIRA: https://issues.apache.org/jira/browse/ROCKETMQ-17
Project: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/commit/3735a3f4 Tree: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/tree/3735a3f4 Diff: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/diff/3735a3f4 Branch: refs/heads/spec Commit: 3735a3f4db44849d3d0b12970b1f64b7d277d675 Parents: 53eda1a Author: vintagewang <[email protected]> Authored: Fri Dec 30 17:00:19 2016 +0800 Committer: vintagewang <[email protected]> Committed: Fri Dec 30 17:00:19 2016 +0800 ---------------------------------------------------------------------- spec/code/pom.xml | 10 +- .../org/apache/openrelay/InvokeContext.java | 21 +++ .../java/org/apache/openrelay/KeyValue.java | 36 +++++ .../org/apache/openrelay/ServiceEndPoint.java | 95 +++++++++++++ .../openrelay/ServiceEndPointManager.java | 35 +++++ .../org/apache/openrelay/ServiceLifecycle.java | 24 ++++ .../apache/openrelay/ServiceLoadBalance.java | 33 +++++ .../org/apache/openrelay/ServiceProperties.java | 32 +++++ .../internal/ServiceEndPointAdapter.java | 132 +++++++++++++++++++ .../org/apache/openrelay/observer/Observer.java | 39 ++++++ 10 files changed, 455 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/3735a3f4/spec/code/pom.xml ---------------------------------------------------------------------- diff --git a/spec/code/pom.xml b/spec/code/pom.xml index 097431b..ec66577 100644 --- a/spec/code/pom.xml +++ b/spec/code/pom.xml @@ -13,6 +13,7 @@ <module>messaging-user-level-api/java</module> <module>messaging-user-level-samples/java</module> <module>messaging-wire-level-api</module> + <module>relay-user-level-api/java</module> </modules> <build> @@ -22,8 +23,8 @@ <artifactId>maven-compiler-plugin</artifactId> <version>2.3.2</version> <configuration> - <source>1.6</source> - <target>1.6</target> + <source>1.7</source> + <target>1.7</target> <encoding>UTF-8</encoding> <showDeprecation>true</showDeprecation> <showWarnings>true</showWarnings> @@ -101,6 +102,11 @@ <version>${project.version}</version> </dependency> <dependency> + <groupId>${project.groupId}</groupId> + <artifactId>relay-user-level-api</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.11</version> http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/3735a3f4/spec/code/relay-user-level-api/java/src/main/java/org/apache/openrelay/InvokeContext.java ---------------------------------------------------------------------- diff --git a/spec/code/relay-user-level-api/java/src/main/java/org/apache/openrelay/InvokeContext.java b/spec/code/relay-user-level-api/java/src/main/java/org/apache/openrelay/InvokeContext.java new file mode 100644 index 0000000..84c987d --- /dev/null +++ b/spec/code/relay-user-level-api/java/src/main/java/org/apache/openrelay/InvokeContext.java @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.openrelay; + +public interface InvokeContext { + KeyValue properties(); +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/3735a3f4/spec/code/relay-user-level-api/java/src/main/java/org/apache/openrelay/KeyValue.java ---------------------------------------------------------------------- diff --git a/spec/code/relay-user-level-api/java/src/main/java/org/apache/openrelay/KeyValue.java b/spec/code/relay-user-level-api/java/src/main/java/org/apache/openrelay/KeyValue.java new file mode 100644 index 0000000..b79023c --- /dev/null +++ b/spec/code/relay-user-level-api/java/src/main/java/org/apache/openrelay/KeyValue.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.openrelay; + +public interface KeyValue { + KeyValue put(final String key, final int value); + + KeyValue put(final String key, final long value); + + KeyValue put(final String key, final double value); + + KeyValue put(final String key, final String value); + + int getInt(final String key); + + long getLong(final String key); + + double getDouble(final String key); + + String getString(final String key); +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/3735a3f4/spec/code/relay-user-level-api/java/src/main/java/org/apache/openrelay/ServiceEndPoint.java ---------------------------------------------------------------------- diff --git a/spec/code/relay-user-level-api/java/src/main/java/org/apache/openrelay/ServiceEndPoint.java b/spec/code/relay-user-level-api/java/src/main/java/org/apache/openrelay/ServiceEndPoint.java new file mode 100644 index 0000000..f48f93e --- /dev/null +++ b/spec/code/relay-user-level-api/java/src/main/java/org/apache/openrelay/ServiceEndPoint.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.openrelay; + +import java.util.Properties; +import org.apache.openrelay.observer.Observer; + +public interface ServiceEndPoint extends ServiceLifecycle { + /** + * Register/re-register a service in a serviceEndPoint object + * if service has been registered in serviceEndPoint object, it will be failed when registering delicately + * + * @param service the service to publish in serviceEndPoint + */ + void publish(Object service); + + /** + * Like {@link #publish(Object)} but specifying {@code properties} + * that can be used to configure the service published + * + * @param service the service to publish in serviceEndPoint + * @param properties the service published properties + */ + + void publish(Object service, Properties properties); + + /** + * Bind a service object to serviceEndPoint, which can directly call services provided by service object + * + * @param type service type to bind in serviceEndPoint + * @return service proxy object to bind + */ + <T> T bind(Class<T> type); + + /** + * Like {@link #bind(Class)} but specifying {@code properties} that can be used to configure the service band + * + * @param type service type to bind in serviceEndPoint + * @param properties the service bind properties + * @param <T> service proxy object to bind + * @return service proxy object to bind + */ + <T> T bind(Class<T> type, Properties properties); + + /** + * Like {@link #bind(Class, Properties)} but specifying {@code serviceLoadBalance} that can be used to select + * endPoint target + * + * @param type service type to bind in serviceConsumer + * @param properties the service band properties + * @param serviceLoadBalance select endPoint target algorithm + * @param <T> service proxy object to bind + * @return service proxy object to bind + */ + <T> T bind(Class<T> type, Properties properties, ServiceLoadBalance serviceLoadBalance); + + /** + * Register an observer in an serviceEndPoint object. Whenever serviceEndPoint object publish or bind an service + * object, it will be notified to the list of observer object registered before + * + * @param observer observer event object to an serviceEndPoint object + */ + void addObserver(Observer observer); + + /** + * Removes the given observer from the list of observer + * <p> + * If the given observer has not been previously registered (i.e. it was + * never added) then this method call is a no-op. If it had been previously + * added then it will be removed. If it had been added more than once, then + * only the first occurrence will be removed. + * + * @param observer The observer to remove + */ + void deleteObserver(Observer observer); + + /** + * @return + */ + InvokeContext invokeContext(); +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/3735a3f4/spec/code/relay-user-level-api/java/src/main/java/org/apache/openrelay/ServiceEndPointManager.java ---------------------------------------------------------------------- diff --git a/spec/code/relay-user-level-api/java/src/main/java/org/apache/openrelay/ServiceEndPointManager.java b/spec/code/relay-user-level-api/java/src/main/java/org/apache/openrelay/ServiceEndPointManager.java new file mode 100644 index 0000000..49e07ab --- /dev/null +++ b/spec/code/relay-user-level-api/java/src/main/java/org/apache/openrelay/ServiceEndPointManager.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.openrelay; + +import org.apache.openrelay.internal.ServiceEndPointAdapter; + +public class ServiceEndPointManager { + public static ServiceEndPoint getServiceEndPoint(String url) throws Exception { + return getServiceEndPoint(url, null); + } + + public static ServiceEndPoint getServiceEndPoint(String url, KeyValue properties) throws Exception { + return ServiceEndPointAdapter.createServiceEndPoint(url, properties); + } + + public static KeyValue buildKeyValue() { + return null; + } +} + http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/3735a3f4/spec/code/relay-user-level-api/java/src/main/java/org/apache/openrelay/ServiceLifecycle.java ---------------------------------------------------------------------- diff --git a/spec/code/relay-user-level-api/java/src/main/java/org/apache/openrelay/ServiceLifecycle.java b/spec/code/relay-user-level-api/java/src/main/java/org/apache/openrelay/ServiceLifecycle.java new file mode 100644 index 0000000..d0c82af --- /dev/null +++ b/spec/code/relay-user-level-api/java/src/main/java/org/apache/openrelay/ServiceLifecycle.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.openrelay; + +public interface ServiceLifecycle { + void start(); + + void stop(); +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/3735a3f4/spec/code/relay-user-level-api/java/src/main/java/org/apache/openrelay/ServiceLoadBalance.java ---------------------------------------------------------------------- diff --git a/spec/code/relay-user-level-api/java/src/main/java/org/apache/openrelay/ServiceLoadBalance.java b/spec/code/relay-user-level-api/java/src/main/java/org/apache/openrelay/ServiceLoadBalance.java new file mode 100644 index 0000000..9c4bd17 --- /dev/null +++ b/spec/code/relay-user-level-api/java/src/main/java/org/apache/openrelay/ServiceLoadBalance.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.openrelay; + +import java.util.Set; + +public interface ServiceLoadBalance { + /** + * Select a collection of eligible providerServicePoint object from the the list of providerServicePoint provided + * According to different selection strategies to select providerServicePoint that satisfied with application needs, + * such as RoundRobin or Random etc. + * + * @param servicePropertiesList providerServicePoint to choose from. + * @return a collection of eligible providerServicePoint object + */ + Set<ServiceProperties> select(Set<ServiceProperties> servicePropertiesList); + +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/3735a3f4/spec/code/relay-user-level-api/java/src/main/java/org/apache/openrelay/ServiceProperties.java ---------------------------------------------------------------------- diff --git a/spec/code/relay-user-level-api/java/src/main/java/org/apache/openrelay/ServiceProperties.java b/spec/code/relay-user-level-api/java/src/main/java/org/apache/openrelay/ServiceProperties.java new file mode 100644 index 0000000..423abd8 --- /dev/null +++ b/spec/code/relay-user-level-api/java/src/main/java/org/apache/openrelay/ServiceProperties.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.openrelay; + +public interface ServiceProperties { + String id(); + + void id(String id); + + String relayAddress(); + + void relayAddress(String address); + + String providerId(); + + void providerId(String id); +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/3735a3f4/spec/code/relay-user-level-api/java/src/main/java/org/apache/openrelay/internal/ServiceEndPointAdapter.java ---------------------------------------------------------------------- diff --git a/spec/code/relay-user-level-api/java/src/main/java/org/apache/openrelay/internal/ServiceEndPointAdapter.java b/spec/code/relay-user-level-api/java/src/main/java/org/apache/openrelay/internal/ServiceEndPointAdapter.java new file mode 100644 index 0000000..06ce69a --- /dev/null +++ b/spec/code/relay-user-level-api/java/src/main/java/org/apache/openrelay/internal/ServiceEndPointAdapter.java @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.openrelay.internal; + +import java.lang.reflect.Constructor; +import java.lang.reflect.InvocationTargetException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import org.apache.openrelay.KeyValue; +import org.apache.openrelay.ServiceEndPoint; + +public class ServiceEndPointAdapter { + private static final String PROTOCOL_NAME = "protocol"; + private static final String SPI_NAME = "spi"; + private static final String URL_NAME = "urls"; + private static final String URL = "url"; + private static final String DEFAULT_SERVICE_END_POINT = "rocketmq"; + private static final String DEFAULT_SERVICE_IMPL = "org.apache.rocketmq.openrelay.impl.ServiceEndPointStandardImpl"; + private static final String URL_SEPARATOR = ":"; + private static final String LIST_SEPARATOR = ","; + private static final String PARAM_SEPARATOR = "&"; + private static final String KV_SEPARATOR = "="; + private static Map<String, String> serviceEndPointClassMap = new HashMap<>(); + + static { + serviceEndPointClassMap.put(DEFAULT_SERVICE_END_POINT, DEFAULT_SERVICE_IMPL); + } + + private static Map<String, List<String>> parseURI(String uri) { + if (uri == null || uri.length() == 0) { + return new HashMap<>(); + } + + int spiIndex = 0; + int index = uri.indexOf(URL_SEPARATOR); + Map<String, List<String>> results = new HashMap<>(); + String protocol = uri.substring(0, index); + List<String> protocolSet = new ArrayList<>(); + protocolSet.add(protocol); + results.put(PROTOCOL_NAME, protocolSet); + if (index > 0) { + String spi; + spiIndex = uri.indexOf(URL_SEPARATOR, index + 1); + if (spiIndex > 0) { + spi = uri.substring(index + 1, spiIndex); + } + else { + spi = uri.substring(index + 1); + } + List<String> spiSet = new ArrayList<>(); + spiSet.add(spi); + results.put(SPI_NAME, spiSet); + } + if (spiIndex > 0) { + String urlList = uri.substring(spiIndex + 1); + String[] list = urlList.split(LIST_SEPARATOR); + if (list.length > 0) { + results.put(URL_NAME, Arrays.asList(list)); + } + } + return results; + } + + private static ServiceEndPoint instantiateServiceEndPoint(String driver, KeyValue properties) + throws ClassNotFoundException, NoSuchMethodException, IllegalAccessException, + InvocationTargetException, InstantiationException { + String serviceImpl = driver; + if (serviceImpl == null) + serviceImpl = DEFAULT_SERVICE_IMPL; + if (serviceEndPointClassMap.containsKey(driver)) + serviceImpl = serviceEndPointClassMap.get(driver); + Class<?> serviceEndPointClass = Class.forName(serviceImpl); + if (serviceEndPointClass == null) + return null; + + if (properties.getString(URL) != null) { + String[] propertySplits = ((String)properties.getString(URL)).split(PARAM_SEPARATOR); + if (propertySplits.length > 0) { + for (int index = 1; index < propertySplits.length; index++) { + String[] kv = propertySplits[index].split(KV_SEPARATOR); + properties.put(kv[0], kv[1]); + } + } + } + Class[] paramTypes = {Properties.class}; + Constructor constructor = serviceEndPointClass.getConstructor(paramTypes); + assert constructor != null; + return (ServiceEndPoint)constructor.newInstance(properties); + } + + private static ServiceEndPoint createServiceEndPoint(Map<String, List<String>> url, KeyValue properties) + throws ClassNotFoundException, NoSuchMethodException, InvocationTargetException, + InstantiationException, IllegalAccessException { + List<String> driver = url.get(SPI_NAME); + List<String> urls = url.get(URL_NAME); + Collections.shuffle(urls); + Collections.shuffle(driver); + if (urls.size() > 0) + properties.put(URL, urls.get(0)); + return ServiceEndPointAdapter.instantiateServiceEndPoint(driver.get(0), properties); + } + + public static ServiceEndPoint createServiceEndPoint(String url, KeyValue properties) + throws ClassNotFoundException, NoSuchMethodException, InstantiationException, + IllegalAccessException, InvocationTargetException { + Map<String, List<String>> driverUrl = parseURI(url); + if (null == driverUrl || driverUrl.size() == 0) { + throw new IllegalArgumentException("driver url parsed result.size ==0"); + } + return createServiceEndPoint(driverUrl, properties); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/3735a3f4/spec/code/relay-user-level-api/java/src/main/java/org/apache/openrelay/observer/Observer.java ---------------------------------------------------------------------- diff --git a/spec/code/relay-user-level-api/java/src/main/java/org/apache/openrelay/observer/Observer.java b/spec/code/relay-user-level-api/java/src/main/java/org/apache/openrelay/observer/Observer.java new file mode 100644 index 0000000..097d309 --- /dev/null +++ b/spec/code/relay-user-level-api/java/src/main/java/org/apache/openrelay/observer/Observer.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.openrelay.observer; + +import java.util.Observable; + +public interface Observer<T> { + /** + * Notifies the Observer that the {@link Observable} has finished sending push-based notifications. + * <p> + * The {@link Observable} will not call this method if it calls {@link #onError}. + */ + void onCompleted(); + + /** + * Notifies the Observer that the {@link Observable} has experienced an error condition. + * <p> + * If the {@link Observable} calls this method, it will not thereafter call + * {@link #onCompleted}. + * + * @param e the exception encountered by the Observable + */ + void onError(Throwable e); +}
