Repository: aurora Updated Branches: refs/heads/master 103dae687 -> 1a391d75f
Lift the standard `ServerSet` encoding. This exposes the standard `ServerSet` `ServiceInstance` encoding to the `ServerSet` interface for conforming implementations to leverage. Bugs closed: AURORA-1468 Reviewed at https://reviews.apache.org/r/45829/ Project: http://git-wip-us.apache.org/repos/asf/aurora/repo Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/1a391d75 Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/1a391d75 Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/1a391d75 Branch: refs/heads/master Commit: 1a391d75f4604bb8a017d53e78d943c057bc6784 Parents: 103dae6 Author: John Sirois <[email protected]> Authored: Wed Apr 6 16:49:32 2016 -0600 Committer: John Sirois <[email protected]> Committed: Wed Apr 6 16:49:32 2016 -0600 ---------------------------------------------------------------------- .../aurora/common/zookeeper/ServerSet.java | 90 +++++++++++++++ .../aurora/common/zookeeper/ServerSetImpl.java | 114 +------------------ .../common/zookeeper/ServerSetImplTest.java | 49 -------- .../aurora/common/zookeeper/ServerSetTest.java | 78 +++++++++++++ .../aurora/common/zookeeper/ServerSetsTest.java | 7 +- 5 files changed, 172 insertions(+), 166 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/aurora/blob/1a391d75/commons/src/main/java/org/apache/aurora/common/zookeeper/ServerSet.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/org/apache/aurora/common/zookeeper/ServerSet.java b/commons/src/main/java/org/apache/aurora/common/zookeeper/ServerSet.java index 6e32083..2d978c1 100644 --- a/commons/src/main/java/org/apache/aurora/common/zookeeper/ServerSet.java +++ b/commons/src/main/java/org/apache/aurora/common/zookeeper/ServerSet.java @@ -13,16 +13,105 @@ */ package org.apache.aurora.common.zookeeper; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.OutputStream; +import java.io.OutputStreamWriter; +import java.io.Writer; import java.net.InetSocketAddress; +import java.nio.charset.Charset; import java.util.Map; +import javax.annotation.Nullable; + +import com.google.common.base.Charsets; +import com.google.common.collect.Maps; +import com.google.gson.Gson; + +import org.apache.aurora.common.io.Codec; +import org.apache.aurora.common.thrift.Endpoint; +import org.apache.aurora.common.thrift.ServiceInstance; +import org.apache.aurora.common.thrift.Status; import org.apache.aurora.common.zookeeper.Group.JoinException; /** * A logical set of servers registered in ZooKeeper. Intended to be used by servers in a * common service to advertise their presence to server-set protocol-aware clients. + * + * Standard implementations should use the {@link #JSON_CODEC} to serialize the service instance + * rendezvous data to zookeeper so that standard clients can interoperate. */ public interface ServerSet { + + /** + * Encodes a {@link ServiceInstance} as a JSON object. + * + * This is the default encoding for service instance data in ZooKeeper. + */ + Codec<ServiceInstance> JSON_CODEC = new Codec<ServiceInstance>() { + class EndpointSchema { + private final String host; + private final int port; + + EndpointSchema(Endpoint endpoint) { + host = endpoint.getHost(); + port = endpoint.getPort(); + } + + Endpoint asEndpoint() { + return new Endpoint(host, port); + } + } + + class ServiceInstanceSchema { + private final EndpointSchema serviceEndpoint; + private final Map<String, EndpointSchema> additionalEndpoints; + private final Status status; + private final @Nullable Integer shard; + + ServiceInstanceSchema(ServiceInstance instance) { + serviceEndpoint = new EndpointSchema(instance.getServiceEndpoint()); + if (instance.getAdditionalEndpoints() != null) { + additionalEndpoints = + Maps.transformValues(instance.getAdditionalEndpoints(), EndpointSchema::new); + } else { + additionalEndpoints = Maps.newHashMap(); + } + status = instance.getStatus(); + shard = instance.isSetShard() ? instance.getShard() : null; + } + + ServiceInstance asServiceInstance() { + ServiceInstance instance = + new ServiceInstance( + serviceEndpoint.asEndpoint(), + Maps.transformValues(additionalEndpoints, EndpointSchema::asEndpoint), + status); + if (shard != null) { + instance.setShard(shard); + } + return instance; + } + } + + private final Charset encoding = Charsets.UTF_8; + private final Gson gson = new Gson(); + + @Override + public void serialize(ServiceInstance instance, OutputStream sink) throws IOException { + Writer writer = new OutputStreamWriter(sink, encoding); + gson.toJson(new ServiceInstanceSchema(instance), writer); + writer.flush(); + } + + @Override + public ServiceInstance deserialize(InputStream source) throws IOException { + InputStreamReader reader = new InputStreamReader(source, encoding); + return gson.fromJson(reader, ServiceInstanceSchema.class).asServiceInstance(); + } + }; + /** * Attempts to join a server set for this logical service group. * @@ -41,6 +130,7 @@ public interface ServerSet { * A handle to a service endpoint's status data that allows updating it to track current events. */ interface EndpointStatus { + /** * Removes the endpoint from the server set. * http://git-wip-us.apache.org/repos/asf/aurora/blob/1a391d75/commons/src/main/java/org/apache/aurora/common/zookeeper/ServerSetImpl.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/org/apache/aurora/common/zookeeper/ServerSetImpl.java b/commons/src/main/java/org/apache/aurora/common/zookeeper/ServerSetImpl.java index 8b385b8..ace4980 100644 --- a/commons/src/main/java/org/apache/aurora/common/zookeeper/ServerSetImpl.java +++ b/commons/src/main/java/org/apache/aurora/common/zookeeper/ServerSetImpl.java @@ -14,30 +14,21 @@ package org.apache.aurora.common.zookeeper; import java.io.IOException; -import java.io.InputStream; -import java.io.InputStreamReader; -import java.io.OutputStream; -import java.io.OutputStreamWriter; -import java.io.Writer; import java.net.InetSocketAddress; -import java.nio.charset.Charset; import java.util.Map; import java.util.Set; import javax.annotation.Nullable; import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Charsets; import com.google.common.base.Function; import com.google.common.base.Joiner; -import com.google.common.base.Preconditions; import com.google.common.base.Predicates; import com.google.common.base.Supplier; import com.google.common.base.Throwables; import com.google.common.cache.CacheBuilder; import com.google.common.cache.CacheLoader; import com.google.common.cache.LoadingCache; -import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import com.google.common.collect.ImmutableSortedSet; import com.google.common.collect.Iterables; @@ -45,12 +36,10 @@ import com.google.common.collect.Maps; import com.google.common.collect.Sets; import com.google.common.collect.Sets.SetView; import com.google.common.util.concurrent.UncheckedExecutionException; -import com.google.gson.Gson; import org.apache.aurora.common.base.Command; import org.apache.aurora.common.io.Codec; import org.apache.aurora.common.net.pool.DynamicHostSet; -import org.apache.aurora.common.thrift.Endpoint; import org.apache.aurora.common.thrift.ServiceInstance; import org.apache.aurora.common.thrift.Status; import org.apache.aurora.common.util.BackoffHelper; @@ -93,7 +82,7 @@ public class ServerSetImpl implements ServerSet, DynamicHostSet<ServiceInstance> * @param path the name-service path of the service to connect to */ public ServerSetImpl(ZooKeeperClient zkClient, Iterable<ACL> acl, String path) { - this(zkClient, new Group(zkClient, acl, path), createCodec()); + this(zkClient, new Group(zkClient, acl, path), JSON_CODEC); } /** @@ -103,7 +92,7 @@ public class ServerSetImpl implements ServerSet, DynamicHostSet<ServiceInstance> * @param group the server group */ public ServerSetImpl(ZooKeeperClient zkClient, Group group) { - this(zkClient, group, createCodec()); + this(zkClient, group, JSON_CODEC); } /** @@ -357,103 +346,4 @@ public class ServerSetImpl implements ServerSet, DynamicHostSet<ServiceInstance> LOG.info(message.toString()); } } - - private static class EndpointSchema { - final String host; - final Integer port; - - EndpointSchema(Endpoint endpoint) { - Preconditions.checkNotNull(endpoint); - this.host = endpoint.getHost(); - this.port = endpoint.getPort(); - } - - String getHost() { - return host; - } - - Integer getPort() { - return port; - } - } - - private static class ServiceInstanceSchema { - final EndpointSchema serviceEndpoint; - final Map<String, EndpointSchema> additionalEndpoints; - final Status status; - final Integer shard; - - ServiceInstanceSchema(ServiceInstance instance) { - this.serviceEndpoint = new EndpointSchema(instance.getServiceEndpoint()); - if (instance.getAdditionalEndpoints() != null) { - this.additionalEndpoints = Maps.transformValues( - instance.getAdditionalEndpoints(), - EndpointSchema::new - ); - } else { - this.additionalEndpoints = Maps.newHashMap(); - } - this.status = instance.getStatus(); - this.shard = instance.isSetShard() ? instance.getShard() : null; - } - - EndpointSchema getServiceEndpoint() { - return serviceEndpoint; - } - - Map<String, EndpointSchema> getAdditionalEndpoints() { - return additionalEndpoints; - } - - Status getStatus() { - return status; - } - - Integer getShard() { - return shard; - } - } - - /** - * An adapted JSON codec that makes use of {@link ServiceInstanceSchema} to circumvent the - * __isset_bit_vector internal thrift struct field that tracks primitive types. - */ - private static class AdaptedJsonCodec implements Codec<ServiceInstance> { - private static final Charset ENCODING = Charsets.UTF_8; - private static final Class<ServiceInstanceSchema> CLASS = ServiceInstanceSchema.class; - private final Gson gson = new Gson(); - - @Override - public void serialize(ServiceInstance instance, OutputStream sink) throws IOException { - Writer w = new OutputStreamWriter(sink, ENCODING); - gson.toJson(new ServiceInstanceSchema(instance), CLASS, w); - w.flush(); - } - - @Override - public ServiceInstance deserialize(InputStream source) throws IOException { - ServiceInstanceSchema output = gson.fromJson(new InputStreamReader(source, ENCODING), CLASS); - Endpoint primary = new Endpoint( - output.getServiceEndpoint().getHost(), output.getServiceEndpoint().getPort()); - Map<String, Endpoint> additional = Maps.transformValues( - output.getAdditionalEndpoints(), - endpoint -> new Endpoint(endpoint.getHost(), endpoint.getPort()) - ); - ServiceInstance instance = - new ServiceInstance(primary, ImmutableMap.copyOf(additional), output.getStatus()); - if (output.getShard() != null) { - instance.setShard(output.getShard()); - } - return instance; - } - } - - /** - * Returns a codec for {@link ServiceInstance} objects that translates to and from JSON. - * - * @return a new codec instance. - */ - public static Codec<ServiceInstance> createCodec() { - return new AdaptedJsonCodec(); - } } http://git-wip-us.apache.org/repos/asf/aurora/blob/1a391d75/commons/src/test/java/org/apache/aurora/common/zookeeper/ServerSetImplTest.java ---------------------------------------------------------------------- diff --git a/commons/src/test/java/org/apache/aurora/common/zookeeper/ServerSetImplTest.java b/commons/src/test/java/org/apache/aurora/common/zookeeper/ServerSetImplTest.java index 37be70b..73049d8 100644 --- a/commons/src/test/java/org/apache/aurora/common/zookeeper/ServerSetImplTest.java +++ b/commons/src/test/java/org/apache/aurora/common/zookeeper/ServerSetImplTest.java @@ -13,7 +13,6 @@ */ package org.apache.aurora.common.zookeeper; -import java.io.ByteArrayOutputStream; import java.io.IOException; import java.net.InetSocketAddress; import java.util.List; @@ -26,7 +25,6 @@ import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterables; import org.apache.aurora.common.base.Command; -import org.apache.aurora.common.io.Codec; import org.apache.aurora.common.net.pool.DynamicHostSet; import org.apache.aurora.common.thrift.Endpoint; import org.apache.aurora.common.thrift.ServiceInstance; @@ -44,7 +42,6 @@ import static org.easymock.EasyMock.anyObject; import static org.easymock.EasyMock.createControl; import static org.easymock.EasyMock.expect; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -194,52 +191,6 @@ public class ServerSetImplTest extends BaseZooKeeperTest { } @Test - public void testJsonCodecRoundtrip() throws Exception { - Codec<ServiceInstance> codec = ServerSetImpl.createCodec(); - ServiceInstance instance1 = new ServiceInstance( - new Endpoint("foo", 1000), - ImmutableMap.of("http", new Endpoint("foo", 8080)), - Status.ALIVE) - .setShard(0); - byte[] data = ServerSets.serializeServiceInstance(instance1, codec); - assertTrue(ServerSets.deserializeServiceInstance(data, codec).getServiceEndpoint().isSetPort()); - assertTrue(ServerSets.deserializeServiceInstance(data, codec).isSetShard()); - - ServiceInstance instance2 = new ServiceInstance( - new Endpoint("foo", 1000), - ImmutableMap.of("http-admin1", new Endpoint("foo", 8080)), - Status.ALIVE); - data = ServerSets.serializeServiceInstance(instance2, codec); - assertTrue(ServerSets.deserializeServiceInstance(data, codec).getServiceEndpoint().isSetPort()); - assertFalse(ServerSets.deserializeServiceInstance(data, codec).isSetShard()); - - ServiceInstance instance3 = new ServiceInstance( - new Endpoint("foo", 1000), - ImmutableMap.<String, Endpoint>of(), - Status.ALIVE); - data = ServerSets.serializeServiceInstance(instance3, codec); - assertTrue(ServerSets.deserializeServiceInstance(data, codec).getServiceEndpoint().isSetPort()); - assertFalse(ServerSets.deserializeServiceInstance(data, codec).isSetShard()); - } - - @Test - public void testJsonCompatibility() throws IOException { - ServiceInstance instance = new ServiceInstance( - new Endpoint("foo", 1000), - ImmutableMap.of("http", new Endpoint("foo", 8080)), - Status.ALIVE).setShard(42); - - ByteArrayOutputStream results = new ByteArrayOutputStream(); - ServerSetImpl.createCodec().serialize(instance, results); - assertEquals( - "{\"serviceEndpoint\":{\"host\":\"foo\",\"port\":1000}," - + "\"additionalEndpoints\":{\"http\":{\"host\":\"foo\",\"port\":8080}}," - + "\"status\":\"ALIVE\"," - + "\"shard\":42}", - results.toString()); - } - - @Test public void testUnwatchOnException() throws Exception { IMocksControl control = createControl(); http://git-wip-us.apache.org/repos/asf/aurora/blob/1a391d75/commons/src/test/java/org/apache/aurora/common/zookeeper/ServerSetTest.java ---------------------------------------------------------------------- diff --git a/commons/src/test/java/org/apache/aurora/common/zookeeper/ServerSetTest.java b/commons/src/test/java/org/apache/aurora/common/zookeeper/ServerSetTest.java new file mode 100644 index 0000000..b48c1f1 --- /dev/null +++ b/commons/src/test/java/org/apache/aurora/common/zookeeper/ServerSetTest.java @@ -0,0 +1,78 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.common.zookeeper; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; + +import com.google.common.collect.ImmutableMap; + +import org.apache.aurora.common.io.Codec; +import org.apache.aurora.common.thrift.Endpoint; +import org.apache.aurora.common.thrift.ServiceInstance; +import org.apache.aurora.common.thrift.Status; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +public class ServerSetTest { + + @Test + public void testJsonCodecRoundtrip() throws Exception { + Codec<ServiceInstance> codec = ServerSet.JSON_CODEC; + ServiceInstance instance1 = new ServiceInstance( + new Endpoint("foo", 1000), + ImmutableMap.of("http", new Endpoint("foo", 8080)), + Status.ALIVE) + .setShard(0); + byte[] data = ServerSets.serializeServiceInstance(instance1, codec); + assertTrue(ServerSets.deserializeServiceInstance(data, codec).getServiceEndpoint().isSetPort()); + assertTrue(ServerSets.deserializeServiceInstance(data, codec).isSetShard()); + + ServiceInstance instance2 = new ServiceInstance( + new Endpoint("foo", 1000), + ImmutableMap.of("http-admin1", new Endpoint("foo", 8080)), + Status.ALIVE); + data = ServerSets.serializeServiceInstance(instance2, codec); + assertTrue(ServerSets.deserializeServiceInstance(data, codec).getServiceEndpoint().isSetPort()); + assertFalse(ServerSets.deserializeServiceInstance(data, codec).isSetShard()); + + ServiceInstance instance3 = new ServiceInstance( + new Endpoint("foo", 1000), + ImmutableMap.<String, Endpoint>of(), + Status.ALIVE); + data = ServerSets.serializeServiceInstance(instance3, codec); + assertTrue(ServerSets.deserializeServiceInstance(data, codec).getServiceEndpoint().isSetPort()); + assertFalse(ServerSets.deserializeServiceInstance(data, codec).isSetShard()); + } + + @Test + public void testJsonCompatibility() throws IOException { + ServiceInstance instance = new ServiceInstance( + new Endpoint("foo", 1000), + ImmutableMap.of("http", new Endpoint("foo", 8080)), + Status.ALIVE).setShard(42); + + ByteArrayOutputStream results = new ByteArrayOutputStream(); + ServerSet.JSON_CODEC.serialize(instance, results); + assertEquals( + "{\"serviceEndpoint\":{\"host\":\"foo\",\"port\":1000}," + + "\"additionalEndpoints\":{\"http\":{\"host\":\"foo\",\"port\":8080}}," + + "\"status\":\"ALIVE\"," + + "\"shard\":42}", + results.toString()); + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/1a391d75/commons/src/test/java/org/apache/aurora/common/zookeeper/ServerSetsTest.java ---------------------------------------------------------------------- diff --git a/commons/src/test/java/org/apache/aurora/common/zookeeper/ServerSetsTest.java b/commons/src/test/java/org/apache/aurora/common/zookeeper/ServerSetsTest.java index 85b89d5..0e67191 100644 --- a/commons/src/test/java/org/apache/aurora/common/zookeeper/ServerSetsTest.java +++ b/commons/src/test/java/org/apache/aurora/common/zookeeper/ServerSetsTest.java @@ -18,7 +18,6 @@ import java.util.Map; import com.google.common.collect.ImmutableMap; -import org.apache.aurora.common.io.Codec; import org.apache.aurora.common.thrift.Endpoint; import org.apache.aurora.common.thrift.ServiceInstance; import org.apache.aurora.common.thrift.Status; @@ -33,12 +32,10 @@ public class ServerSetsTest { Map<String, Endpoint > additionalEndpoints = ImmutableMap.of(); Status status = Status.ALIVE; - Codec<ServiceInstance> codec = ServerSetImpl.createCodec(); - byte[] data = ServerSets.serializeServiceInstance( - endpoint, additionalEndpoints, status, codec); + endpoint, additionalEndpoints, status, ServerSet.JSON_CODEC); - ServiceInstance instance = ServerSets.deserializeServiceInstance(data, codec); + ServiceInstance instance = ServerSets.deserializeServiceInstance(data, ServerSet.JSON_CODEC); assertEquals(endpoint.getPort(), instance.getServiceEndpoint().getPort()); assertEquals(additionalEndpoints, instance.getAdditionalEndpoints());
