http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-client/src/test/java/org/apache/distributedlog/client/ownership/TestOwnershipCache.java ---------------------------------------------------------------------- diff --git a/distributedlog-proxy-client/src/test/java/org/apache/distributedlog/client/ownership/TestOwnershipCache.java b/distributedlog-proxy-client/src/test/java/org/apache/distributedlog/client/ownership/TestOwnershipCache.java new file mode 100644 index 0000000..86d1c11 --- /dev/null +++ b/distributedlog-proxy-client/src/test/java/org/apache/distributedlog/client/ownership/TestOwnershipCache.java @@ -0,0 +1,207 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.distributedlog.client.ownership; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +import org.apache.distributedlog.client.ClientConfig; +import com.twitter.finagle.stats.NullStatsReceiver; +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.util.Map; +import java.util.Set; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TestName; + +/** + * Test Case for Ownership Cache. + */ +public class TestOwnershipCache { + + @Rule + public TestName runtime = new TestName(); + + private static OwnershipCache createOwnershipCache() { + ClientConfig clientConfig = new ClientConfig(); + return new OwnershipCache(clientConfig, null, + NullStatsReceiver.get(), NullStatsReceiver.get()); + } + + private static SocketAddress createSocketAddress(int port) { + return new InetSocketAddress("127.0.0.1", port); + } + + @Test(timeout = 60000) + public void testUpdateOwner() { + OwnershipCache cache = createOwnershipCache(); + SocketAddress addr = createSocketAddress(1000); + String stream = runtime.getMethodName(); + + assertTrue("Should successfully update owner if no owner exists before", + cache.updateOwner(stream, addr)); + assertEquals("Owner should be " + addr + " for stream " + stream, + addr, cache.getOwner(stream)); + assertTrue("Should successfully update owner if old owner is same", + cache.updateOwner(stream, addr)); + assertEquals("Owner should be " + addr + " for stream " + stream, + addr, cache.getOwner(stream)); + } + + @Test(timeout = 60000) + public void testRemoveOwnerFromStream() { + OwnershipCache cache = createOwnershipCache(); + int initialPort = 2000; + int numProxies = 2; + int numStreamsPerProxy = 2; + for (int i = 0; i < numProxies; i++) { + SocketAddress addr = createSocketAddress(initialPort + i); + for (int j = 0; j < numStreamsPerProxy; j++) { + String stream = runtime.getMethodName() + "_" + i + "_" + j; + cache.updateOwner(stream, addr); + } + } + Map<String, SocketAddress> ownershipMap = cache.getStreamOwnerMapping(); + assertEquals("There should be " + (numProxies * numStreamsPerProxy) + " entries in cache", + numProxies * numStreamsPerProxy, ownershipMap.size()); + Map<SocketAddress, Set<String>> ownershipDistribution = cache.getStreamOwnershipDistribution(); + assertEquals("There should be " + numProxies + " proxies cached", + numProxies, ownershipDistribution.size()); + + String stream = runtime.getMethodName() + "_0_0"; + SocketAddress owner = createSocketAddress(initialPort); + + // remove non-existent mapping won't change anything + SocketAddress nonExistentAddr = createSocketAddress(initialPort + 999); + cache.removeOwnerFromStream(stream, nonExistentAddr, "remove-non-existent-addr"); + assertEquals("Owner " + owner + " should not be removed", + owner, cache.getOwner(stream)); + ownershipMap = cache.getStreamOwnerMapping(); + assertEquals("There should be " + (numProxies * numStreamsPerProxy) + " entries in cache", + numProxies * numStreamsPerProxy, ownershipMap.size()); + + // remove existent mapping should remove ownership mapping + cache.removeOwnerFromStream(stream, owner, "remove-owner"); + assertNull("Owner " + owner + " should be removed", cache.getOwner(stream)); + ownershipMap = cache.getStreamOwnerMapping(); + assertEquals("There should be " + (numProxies * numStreamsPerProxy - 1) + " entries left in cache", + numProxies * numStreamsPerProxy - 1, ownershipMap.size()); + ownershipDistribution = cache.getStreamOwnershipDistribution(); + assertEquals("There should still be " + numProxies + " proxies cached", + numProxies, ownershipDistribution.size()); + Set<String> ownedStreams = ownershipDistribution.get(owner); + assertEquals("There should be only " + (numStreamsPerProxy - 1) + " streams owned for " + owner, + numStreamsPerProxy - 1, ownedStreams.size()); + assertFalse("Stream " + stream + " should not be owned by " + owner, + ownedStreams.contains(stream)); + } + + @Test(timeout = 60000) + public void testRemoveAllStreamsFromOwner() { + OwnershipCache cache = createOwnershipCache(); + int initialPort = 2000; + int numProxies = 2; + int numStreamsPerProxy = 2; + for (int i = 0; i < numProxies; i++) { + SocketAddress addr = createSocketAddress(initialPort + i); + for (int j = 0; j < numStreamsPerProxy; j++) { + String stream = runtime.getMethodName() + "_" + i + "_" + j; + cache.updateOwner(stream, addr); + } + } + Map<String, SocketAddress> ownershipMap = cache.getStreamOwnerMapping(); + assertEquals("There should be " + (numProxies * numStreamsPerProxy) + " entries in cache", + numProxies * numStreamsPerProxy, ownershipMap.size()); + Map<SocketAddress, Set<String>> ownershipDistribution = cache.getStreamOwnershipDistribution(); + assertEquals("There should be " + numProxies + " proxies cached", + numProxies, ownershipDistribution.size()); + + SocketAddress owner = createSocketAddress(initialPort); + + // remove non-existent host won't change anything + SocketAddress nonExistentAddr = createSocketAddress(initialPort + 999); + cache.removeAllStreamsFromOwner(nonExistentAddr); + ownershipMap = cache.getStreamOwnerMapping(); + assertEquals("There should still be " + (numProxies * numStreamsPerProxy) + " entries in cache", + numProxies * numStreamsPerProxy, ownershipMap.size()); + ownershipDistribution = cache.getStreamOwnershipDistribution(); + assertEquals("There should still be " + numProxies + " proxies cached", + numProxies, ownershipDistribution.size()); + + // remove existent host should remove ownership mapping + cache.removeAllStreamsFromOwner(owner); + ownershipMap = cache.getStreamOwnerMapping(); + assertEquals("There should be " + ((numProxies - 1) * numStreamsPerProxy) + " entries left in cache", + (numProxies - 1) * numStreamsPerProxy, ownershipMap.size()); + ownershipDistribution = cache.getStreamOwnershipDistribution(); + assertEquals("There should be " + (numProxies - 1) + " proxies cached", + numProxies - 1, ownershipDistribution.size()); + assertFalse("Host " + owner + " should not be cached", + ownershipDistribution.containsKey(owner)); + } + + @Test(timeout = 60000) + public void testReplaceOwner() { + OwnershipCache cache = createOwnershipCache(); + int initialPort = 2000; + int numProxies = 2; + int numStreamsPerProxy = 2; + for (int i = 0; i < numProxies; i++) { + SocketAddress addr = createSocketAddress(initialPort + i); + for (int j = 0; j < numStreamsPerProxy; j++) { + String stream = runtime.getMethodName() + "_" + i + "_" + j; + cache.updateOwner(stream, addr); + } + } + Map<String, SocketAddress> ownershipMap = cache.getStreamOwnerMapping(); + assertEquals("There should be " + (numProxies * numStreamsPerProxy) + " entries in cache", + numProxies * numStreamsPerProxy, ownershipMap.size()); + Map<SocketAddress, Set<String>> ownershipDistribution = cache.getStreamOwnershipDistribution(); + assertEquals("There should be " + numProxies + " proxies cached", + numProxies, ownershipDistribution.size()); + + String stream = runtime.getMethodName() + "_0_0"; + SocketAddress oldOwner = createSocketAddress(initialPort); + SocketAddress newOwner = createSocketAddress(initialPort + 999); + + cache.updateOwner(stream, newOwner); + assertEquals("Owner of " + stream + " should be changed from " + oldOwner + " to " + newOwner, + newOwner, cache.getOwner(stream)); + ownershipMap = cache.getStreamOwnerMapping(); + assertEquals("There should be " + (numProxies * numStreamsPerProxy) + " entries in cache", + numProxies * numStreamsPerProxy, ownershipMap.size()); + assertEquals("Owner of " + stream + " should be " + newOwner, + newOwner, ownershipMap.get(stream)); + ownershipDistribution = cache.getStreamOwnershipDistribution(); + assertEquals("There should be " + (numProxies + 1) + " proxies cached", + numProxies + 1, ownershipDistribution.size()); + Set<String> oldOwnedStreams = ownershipDistribution.get(oldOwner); + assertEquals("There should be only " + (numStreamsPerProxy - 1) + " streams owned by " + oldOwner, + numStreamsPerProxy - 1, oldOwnedStreams.size()); + assertFalse("Stream " + stream + " should not be owned by " + oldOwner, + oldOwnedStreams.contains(stream)); + Set<String> newOwnedStreams = ownershipDistribution.get(newOwner); + assertEquals("There should be only " + (numStreamsPerProxy - 1) + " streams owned by " + newOwner, + 1, newOwnedStreams.size()); + assertTrue("Stream " + stream + " should be owned by " + newOwner, + newOwnedStreams.contains(stream)); + } +}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-client/src/test/java/org/apache/distributedlog/client/proxy/MockDistributedLogServices.java ---------------------------------------------------------------------- diff --git a/distributedlog-proxy-client/src/test/java/org/apache/distributedlog/client/proxy/MockDistributedLogServices.java b/distributedlog-proxy-client/src/test/java/org/apache/distributedlog/client/proxy/MockDistributedLogServices.java new file mode 100644 index 0000000..8ef33bd --- /dev/null +++ b/distributedlog-proxy-client/src/test/java/org/apache/distributedlog/client/proxy/MockDistributedLogServices.java @@ -0,0 +1,144 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.distributedlog.client.proxy; + +import org.apache.distributedlog.thrift.service.BulkWriteResponse; +import org.apache.distributedlog.thrift.service.ClientInfo; +import org.apache.distributedlog.thrift.service.DistributedLogService; +import org.apache.distributedlog.thrift.service.HeartbeatOptions; +import org.apache.distributedlog.thrift.service.ServerInfo; +import org.apache.distributedlog.thrift.service.WriteContext; +import org.apache.distributedlog.thrift.service.WriteResponse; +import com.twitter.util.Future; +import java.nio.ByteBuffer; +import java.util.List; + +/** + * Mock DistributedLog Related Services. + */ +public class MockDistributedLogServices { + + /** + * Mock basic service. + */ + static class MockBasicService implements DistributedLogService.ServiceIface { + + @Override + public Future<ServerInfo> handshake() { + return Future.value(new ServerInfo()); + } + + @Override + public Future<ServerInfo> handshakeWithClientInfo(ClientInfo clientInfo) { + return Future.value(new ServerInfo()); + } + + @Override + public Future<WriteResponse> heartbeat(String stream, WriteContext ctx) { + return Future.value(new WriteResponse()); + } + + @Override + public Future<WriteResponse> heartbeatWithOptions(String stream, + WriteContext ctx, + HeartbeatOptions options) { + return Future.value(new WriteResponse()); + } + + @Override + public Future<WriteResponse> write(String stream, + ByteBuffer data) { + return Future.value(new WriteResponse()); + } + + @Override + public Future<WriteResponse> writeWithContext(String stream, + ByteBuffer data, + WriteContext ctx) { + return Future.value(new WriteResponse()); + } + + @Override + public Future<BulkWriteResponse> writeBulkWithContext(String stream, + List<ByteBuffer> data, + WriteContext ctx) { + return Future.value(new BulkWriteResponse()); + } + + @Override + public Future<WriteResponse> truncate(String stream, + String dlsn, + WriteContext ctx) { + return Future.value(new WriteResponse()); + } + + @Override + public Future<WriteResponse> release(String stream, + WriteContext ctx) { + return Future.value(new WriteResponse()); + } + + @Override + public Future<WriteResponse> create(String stream, WriteContext ctx) { + return Future.value(new WriteResponse()); + } + + @Override + public Future<WriteResponse> delete(String stream, + WriteContext ctx) { + return Future.value(new WriteResponse()); + } + + @Override + public Future<WriteResponse> getOwner(String stream, WriteContext ctx) { + return Future.value(new WriteResponse()); + } + + @Override + public Future<Void> setAcceptNewStream(boolean enabled) { + return Future.value(null); + } + } + + /** + * Mock server info service. + */ + public static class MockServerInfoService extends MockBasicService { + + protected ServerInfo serverInfo; + + public MockServerInfoService() { + serverInfo = new ServerInfo(); + } + + public void updateServerInfo(ServerInfo serverInfo) { + this.serverInfo = serverInfo; + } + + @Override + public Future<ServerInfo> handshake() { + return Future.value(serverInfo); + } + + @Override + public Future<ServerInfo> handshakeWithClientInfo(ClientInfo clientInfo) { + return Future.value(serverInfo); + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-client/src/test/java/org/apache/distributedlog/client/proxy/MockProxyClientBuilder.java ---------------------------------------------------------------------- diff --git a/distributedlog-proxy-client/src/test/java/org/apache/distributedlog/client/proxy/MockProxyClientBuilder.java b/distributedlog-proxy-client/src/test/java/org/apache/distributedlog/client/proxy/MockProxyClientBuilder.java new file mode 100644 index 0000000..e38c2ed --- /dev/null +++ b/distributedlog-proxy-client/src/test/java/org/apache/distributedlog/client/proxy/MockProxyClientBuilder.java @@ -0,0 +1,49 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.distributedlog.client.proxy; + +import org.apache.distributedlog.thrift.service.DistributedLogService; +import java.net.SocketAddress; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +/** + * Mock Proxy Client Builder. + */ +class MockProxyClientBuilder implements ProxyClient.Builder { + + static class MockProxyClient extends ProxyClient { + MockProxyClient(SocketAddress address, + DistributedLogService.ServiceIface service) { + super(address, new MockThriftClient(), service); + } + } + + private final ConcurrentMap<SocketAddress, MockProxyClient> clients = + new ConcurrentHashMap<SocketAddress, MockProxyClient>(); + + public void provideProxyClient(SocketAddress address, + MockProxyClient proxyClient) { + clients.put(address, proxyClient); + } + + @Override + public ProxyClient build(SocketAddress address) { + return clients.get(address); + } +} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-client/src/test/java/org/apache/distributedlog/client/proxy/MockThriftClient.java ---------------------------------------------------------------------- diff --git a/distributedlog-proxy-client/src/test/java/org/apache/distributedlog/client/proxy/MockThriftClient.java b/distributedlog-proxy-client/src/test/java/org/apache/distributedlog/client/proxy/MockThriftClient.java new file mode 100644 index 0000000..ad1c878 --- /dev/null +++ b/distributedlog-proxy-client/src/test/java/org/apache/distributedlog/client/proxy/MockThriftClient.java @@ -0,0 +1,32 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.distributedlog.client.proxy; + +import com.twitter.finagle.Service; +import com.twitter.finagle.thrift.ThriftClientRequest; +import com.twitter.util.Future; + +/** + * Mock Thrift Client. + */ +class MockThriftClient extends Service<ThriftClientRequest, byte[]> { + @Override + public Future<byte[]> apply(ThriftClientRequest request) { + return Future.value(request.message); + } +} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-client/src/test/java/org/apache/distributedlog/client/proxy/TestProxyClientManager.java ---------------------------------------------------------------------- diff --git a/distributedlog-proxy-client/src/test/java/org/apache/distributedlog/client/proxy/TestProxyClientManager.java b/distributedlog-proxy-client/src/test/java/org/apache/distributedlog/client/proxy/TestProxyClientManager.java new file mode 100644 index 0000000..6d9a471 --- /dev/null +++ b/distributedlog-proxy-client/src/test/java/org/apache/distributedlog/client/proxy/TestProxyClientManager.java @@ -0,0 +1,368 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.distributedlog.client.proxy; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Maps; +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.apache.distributedlog.client.ClientConfig; +import org.apache.distributedlog.client.proxy.MockDistributedLogServices.MockBasicService; +import org.apache.distributedlog.client.proxy.MockDistributedLogServices.MockServerInfoService; +import org.apache.distributedlog.client.proxy.MockProxyClientBuilder.MockProxyClient; +import org.apache.distributedlog.client.resolver.DefaultRegionResolver; +import org.apache.distributedlog.client.stats.ClientStats; +import org.apache.distributedlog.thrift.service.ServerInfo; +import com.twitter.finagle.stats.NullStatsReceiver; +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import org.apache.commons.lang3.tuple.Pair; +import org.jboss.netty.util.HashedWheelTimer; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TestName; + +/** + * Test Proxy Client Manager. + */ +public class TestProxyClientManager { + + @Rule + public TestName runtime = new TestName(); + + static class TestHostProvider implements HostProvider { + + Set<SocketAddress> hosts = new HashSet<SocketAddress>(); + + synchronized void addHost(SocketAddress host) { + hosts.add(host); + } + + @Override + public synchronized Set<SocketAddress> getHosts() { + return ImmutableSet.copyOf(hosts); + } + + } + + private static ProxyClientManager createProxyClientManager(ProxyClient.Builder builder, + long periodicHandshakeIntervalMs) { + HostProvider provider = new TestHostProvider(); + return createProxyClientManager(builder, provider, periodicHandshakeIntervalMs); + } + + private static ProxyClientManager createProxyClientManager(ProxyClient.Builder builder, + HostProvider hostProvider, + long periodicHandshakeIntervalMs) { + ClientConfig clientConfig = new ClientConfig(); + clientConfig.setPeriodicHandshakeIntervalMs(periodicHandshakeIntervalMs); + clientConfig.setPeriodicOwnershipSyncIntervalMs(-1); + HashedWheelTimer dlTimer = new HashedWheelTimer( + new ThreadFactoryBuilder().setNameFormat("TestProxyClientManager-timer-%d").build(), + clientConfig.getRedirectBackoffStartMs(), + TimeUnit.MILLISECONDS); + return new ProxyClientManager(clientConfig, builder, dlTimer, hostProvider, + new ClientStats(NullStatsReceiver.get(), false, new DefaultRegionResolver())); + } + + private static SocketAddress createSocketAddress(int port) { + return new InetSocketAddress("127.0.0.1", port); + } + + private static MockProxyClient createMockProxyClient(SocketAddress address) { + return new MockProxyClient(address, new MockBasicService()); + } + + private static Pair<MockProxyClient, MockServerInfoService> createMockProxyClient( + SocketAddress address, ServerInfo serverInfo) { + MockServerInfoService service = new MockServerInfoService(); + MockProxyClient proxyClient = new MockProxyClient(address, service); + service.updateServerInfo(serverInfo); + return Pair.of(proxyClient, service); + } + + @Test(timeout = 60000) + public void testBasicCreateRemove() throws Exception { + SocketAddress address = createSocketAddress(1000); + MockProxyClientBuilder builder = new MockProxyClientBuilder(); + MockProxyClient mockProxyClient = createMockProxyClient(address); + builder.provideProxyClient(address, mockProxyClient); + + ProxyClientManager clientManager = createProxyClientManager(builder, 0L); + assertEquals("There should be no clients in the manager", + 0, clientManager.getNumProxies()); + ProxyClient proxyClient = clientManager.createClient(address); + assertEquals("Create client should build the proxy client", + 1, clientManager.getNumProxies()); + assertTrue("The client returned should be the same client that builder built", + mockProxyClient == proxyClient); + } + + @Test(timeout = 60000) + public void testGetShouldCreateClient() throws Exception { + SocketAddress address = createSocketAddress(2000); + MockProxyClientBuilder builder = new MockProxyClientBuilder(); + MockProxyClient mockProxyClient = createMockProxyClient(address); + builder.provideProxyClient(address, mockProxyClient); + + ProxyClientManager clientManager = createProxyClientManager(builder, 0L); + assertEquals("There should be no clients in the manager", + 0, clientManager.getNumProxies()); + ProxyClient proxyClient = clientManager.getClient(address); + assertEquals("Get client should build the proxy client", + 1, clientManager.getNumProxies()); + assertTrue("The client returned should be the same client that builder built", + mockProxyClient == proxyClient); + } + + @Test(timeout = 60000) + public void testConditionalRemoveClient() throws Exception { + SocketAddress address = createSocketAddress(3000); + MockProxyClientBuilder builder = new MockProxyClientBuilder(); + MockProxyClient mockProxyClient = createMockProxyClient(address); + MockProxyClient anotherMockProxyClient = createMockProxyClient(address); + builder.provideProxyClient(address, mockProxyClient); + + ProxyClientManager clientManager = createProxyClientManager(builder, 0L); + assertEquals("There should be no clients in the manager", + 0, clientManager.getNumProxies()); + clientManager.createClient(address); + assertEquals("Create client should build the proxy client", + 1, clientManager.getNumProxies()); + clientManager.removeClient(address, anotherMockProxyClient); + assertEquals("Conditional remove should not remove proxy client", + 1, clientManager.getNumProxies()); + clientManager.removeClient(address, mockProxyClient); + assertEquals("Conditional remove should remove proxy client", + 0, clientManager.getNumProxies()); + } + + @Test(timeout = 60000) + public void testRemoveClient() throws Exception { + SocketAddress address = createSocketAddress(3000); + MockProxyClientBuilder builder = new MockProxyClientBuilder(); + MockProxyClient mockProxyClient = createMockProxyClient(address); + builder.provideProxyClient(address, mockProxyClient); + + ProxyClientManager clientManager = createProxyClientManager(builder, 0L); + assertEquals("There should be no clients in the manager", + 0, clientManager.getNumProxies()); + clientManager.createClient(address); + assertEquals("Create client should build the proxy client", + 1, clientManager.getNumProxies()); + clientManager.removeClient(address); + assertEquals("Remove should remove proxy client", + 0, clientManager.getNumProxies()); + } + + @Test(timeout = 60000) + public void testCreateClientShouldHandshake() throws Exception { + SocketAddress address = createSocketAddress(3000); + MockProxyClientBuilder builder = new MockProxyClientBuilder(); + ServerInfo serverInfo = new ServerInfo(); + serverInfo.putToOwnerships(runtime.getMethodName() + "_stream", + runtime.getMethodName() + "_owner"); + Pair<MockProxyClient, MockServerInfoService> mockProxyClient = + createMockProxyClient(address, serverInfo); + builder.provideProxyClient(address, mockProxyClient.getLeft()); + + final AtomicReference<ServerInfo> resultHolder = new AtomicReference<ServerInfo>(null); + final CountDownLatch doneLatch = new CountDownLatch(1); + ProxyListener listener = new ProxyListener() { + @Override + public void onHandshakeSuccess(SocketAddress address, ProxyClient client, ServerInfo serverInfo) { + resultHolder.set(serverInfo); + doneLatch.countDown(); + } + @Override + public void onHandshakeFailure(SocketAddress address, ProxyClient client, Throwable cause) { + } + }; + + ProxyClientManager clientManager = createProxyClientManager(builder, 0L); + clientManager.registerProxyListener(listener); + assertEquals("There should be no clients in the manager", + 0, clientManager.getNumProxies()); + clientManager.createClient(address); + assertEquals("Create client should build the proxy client", + 1, clientManager.getNumProxies()); + + // When a client is created, it would handshake with that proxy + doneLatch.await(); + assertEquals("Handshake should return server info", + serverInfo, resultHolder.get()); + } + + @Test(timeout = 60000) + public void testHandshake() throws Exception { + final int numHosts = 3; + final int numStreamsPerHost = 3; + final int initialPort = 4000; + + MockProxyClientBuilder builder = new MockProxyClientBuilder(); + Map<SocketAddress, ServerInfo> serverInfoMap = + new HashMap<SocketAddress, ServerInfo>(); + for (int i = 0; i < numHosts; i++) { + SocketAddress address = createSocketAddress(initialPort + i); + ServerInfo serverInfo = new ServerInfo(); + for (int j = 0; j < numStreamsPerHost; j++) { + serverInfo.putToOwnerships(runtime.getMethodName() + "_stream_" + j, + address.toString()); + } + Pair<MockProxyClient, MockServerInfoService> mockProxyClient = + createMockProxyClient(address, serverInfo); + builder.provideProxyClient(address, mockProxyClient.getLeft()); + serverInfoMap.put(address, serverInfo); + } + + final Map<SocketAddress, ServerInfo> results = new HashMap<SocketAddress, ServerInfo>(); + final CountDownLatch doneLatch = new CountDownLatch(2 * numHosts); + ProxyListener listener = new ProxyListener() { + @Override + public void onHandshakeSuccess(SocketAddress address, ProxyClient client, ServerInfo serverInfo) { + synchronized (results) { + results.put(address, serverInfo); + } + doneLatch.countDown(); + } + + @Override + public void onHandshakeFailure(SocketAddress address, ProxyClient client, Throwable cause) { + } + }; + + TestHostProvider rs = new TestHostProvider(); + ProxyClientManager clientManager = createProxyClientManager(builder, rs, 0L); + clientManager.registerProxyListener(listener); + assertEquals("There should be no clients in the manager", + 0, clientManager.getNumProxies()); + for (int i = 0; i < numHosts; i++) { + rs.addHost(createSocketAddress(initialPort + i)); + } + // handshake would handshake with 3 hosts again + clientManager.handshake(); + doneLatch.await(); + assertEquals("Handshake should return server info", + numHosts, results.size()); + assertTrue("Handshake should get all server infos", + Maps.difference(serverInfoMap, results).areEqual()); + } + + @Test(timeout = 60000) + public void testPeriodicHandshake() throws Exception { + final int numHosts = 3; + final int numStreamsPerHost = 3; + final int initialPort = 5000; + + MockProxyClientBuilder builder = new MockProxyClientBuilder(); + Map<SocketAddress, ServerInfo> serverInfoMap = + new HashMap<SocketAddress, ServerInfo>(); + Map<SocketAddress, MockServerInfoService> mockServiceMap = + new HashMap<SocketAddress, MockServerInfoService>(); + final Map<SocketAddress, CountDownLatch> hostDoneLatches = + new HashMap<SocketAddress, CountDownLatch>(); + for (int i = 0; i < numHosts; i++) { + SocketAddress address = createSocketAddress(initialPort + i); + ServerInfo serverInfo = new ServerInfo(); + for (int j = 0; j < numStreamsPerHost; j++) { + serverInfo.putToOwnerships(runtime.getMethodName() + "_stream_" + j, + address.toString()); + } + Pair<MockProxyClient, MockServerInfoService> mockProxyClient = + createMockProxyClient(address, serverInfo); + builder.provideProxyClient(address, mockProxyClient.getLeft()); + serverInfoMap.put(address, serverInfo); + mockServiceMap.put(address, mockProxyClient.getRight()); + hostDoneLatches.put(address, new CountDownLatch(2)); + } + + final Map<SocketAddress, ServerInfo> results = new HashMap<SocketAddress, ServerInfo>(); + final CountDownLatch doneLatch = new CountDownLatch(numHosts); + ProxyListener listener = new ProxyListener() { + @Override + public void onHandshakeSuccess(SocketAddress address, ProxyClient client, ServerInfo serverInfo) { + synchronized (results) { + results.put(address, serverInfo); + CountDownLatch latch = hostDoneLatches.get(address); + if (null != latch) { + latch.countDown(); + } + } + doneLatch.countDown(); + } + + @Override + public void onHandshakeFailure(SocketAddress address, ProxyClient client, Throwable cause) { + } + }; + + TestHostProvider rs = new TestHostProvider(); + ProxyClientManager clientManager = createProxyClientManager(builder, rs, 50L); + clientManager.setPeriodicHandshakeEnabled(false); + clientManager.registerProxyListener(listener); + + assertEquals("There should be no clients in the manager", + 0, clientManager.getNumProxies()); + for (int i = 0; i < numHosts; i++) { + SocketAddress address = createSocketAddress(initialPort + i); + rs.addHost(address); + clientManager.createClient(address); + } + + // make sure the first 3 handshakes going through + doneLatch.await(); + + assertEquals("Handshake should return server info", + numHosts, results.size()); + assertTrue("Handshake should get all server infos", + Maps.difference(serverInfoMap, results).areEqual()); + + // update server info + for (int i = 0; i < numHosts; i++) { + SocketAddress address = createSocketAddress(initialPort + i); + ServerInfo serverInfo = new ServerInfo(); + for (int j = 0; j < numStreamsPerHost; j++) { + serverInfo.putToOwnerships(runtime.getMethodName() + "_new_stream_" + j, + address.toString()); + } + MockServerInfoService service = mockServiceMap.get(address); + serverInfoMap.put(address, serverInfo); + service.updateServerInfo(serverInfo); + } + + clientManager.setPeriodicHandshakeEnabled(true); + for (int i = 0; i < numHosts; i++) { + SocketAddress address = createSocketAddress(initialPort + i); + CountDownLatch latch = hostDoneLatches.get(address); + latch.await(); + } + + assertTrue("Periodic handshake should update all server infos", + Maps.difference(serverInfoMap, results).areEqual()); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-client/src/test/java/org/apache/distributedlog/client/routing/TestConsistentHashRoutingService.java ---------------------------------------------------------------------- diff --git a/distributedlog-proxy-client/src/test/java/org/apache/distributedlog/client/routing/TestConsistentHashRoutingService.java b/distributedlog-proxy-client/src/test/java/org/apache/distributedlog/client/routing/TestConsistentHashRoutingService.java new file mode 100644 index 0000000..f44cddd --- /dev/null +++ b/distributedlog-proxy-client/src/test/java/org/apache/distributedlog/client/routing/TestConsistentHashRoutingService.java @@ -0,0 +1,417 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.distributedlog.client.routing; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; +import org.apache.distributedlog.client.resolver.DefaultRegionResolver; +import org.apache.distributedlog.service.DLSocketAddress; +import com.twitter.finagle.Address; +import com.twitter.finagle.Addresses; +import com.twitter.finagle.ChannelWriteException; +import com.twitter.finagle.NoBrokersAvailableException; +import com.twitter.finagle.stats.NullStatsReceiver; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.util.ArrayList; +import java.util.List; +import java.util.Set; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; +import org.junit.Test; + +/** + * Test Case for {@link ConsistentHashRoutingService}. + */ +public class TestConsistentHashRoutingService { + + @Test(timeout = 60000) + public void testBlackoutHost() throws Exception { + TestName name = new TestName(); + RoutingService routingService = ConsistentHashRoutingService.newBuilder() + .serverSet(new NameServerSet(name)) + .resolveFromName(true) + .numReplicas(997) + .blackoutSeconds(2) + .build(); + + InetSocketAddress inetAddress = new InetSocketAddress("127.0.0.1", 3181); + Address address = Addresses.newInetAddress(inetAddress); + List<Address> addresses = new ArrayList<Address>(1); + addresses.add(address); + name.changeAddrs(addresses); + + routingService.startService(); + + RoutingService.RoutingContext routingContext = + RoutingService.RoutingContext.of(new DefaultRegionResolver()); + + String streamName = "test-blackout-host"; + assertEquals(inetAddress, routingService.getHost(streamName, routingContext)); + routingService.removeHost(inetAddress, new ChannelWriteException(new IOException("test exception"))); + try { + routingService.getHost(streamName, routingContext); + fail("Should fail to get host since no brokers are available"); + } catch (NoBrokersAvailableException nbae) { + // expected + } + + TimeUnit.SECONDS.sleep(3); + assertEquals(inetAddress, routingService.getHost(streamName, routingContext)); + + routingService.stopService(); + } + + @Test(timeout = 60000) + public void testPerformServerSetChangeOnName() throws Exception { + TestName name = new TestName(); + ConsistentHashRoutingService routingService = (ConsistentHashRoutingService) + ConsistentHashRoutingService.newBuilder() + .serverSet(new NameServerSet(name)) + .resolveFromName(true) + .numReplicas(997) + .build(); + + int basePort = 3180; + int numHosts = 4; + List<Address> addresses1 = Lists.newArrayListWithExpectedSize(4); + List<Address> addresses2 = Lists.newArrayListWithExpectedSize(4); + List<Address> addresses3 = Lists.newArrayListWithExpectedSize(4); + + // fill up the addresses1 + for (int i = 0; i < numHosts; i++) { + InetSocketAddress inetAddress = new InetSocketAddress("127.0.0.1", basePort + i); + Address address = Addresses.newInetAddress(inetAddress); + addresses1.add(address); + } + // fill up the addresses2 - overlap with addresses1 + for (int i = 0; i < numHosts; i++) { + InetSocketAddress inetAddress = new InetSocketAddress("127.0.0.1", basePort + 2 + i); + Address address = Addresses.newInetAddress(inetAddress); + addresses2.add(address); + } + // fill up the addresses3 - not overlap with addresses2 + for (int i = 0; i < numHosts; i++) { + InetSocketAddress inetAddress = new InetSocketAddress("127.0.0.1", basePort + 10 + i); + Address address = Addresses.newInetAddress(inetAddress); + addresses3.add(address); + } + + final List<SocketAddress> leftAddresses = Lists.newArrayList(); + final List<SocketAddress> joinAddresses = Lists.newArrayList(); + + RoutingService.RoutingListener routingListener = new RoutingService.RoutingListener() { + @Override + public void onServerLeft(SocketAddress address) { + synchronized (leftAddresses) { + leftAddresses.add(address); + leftAddresses.notifyAll(); + } + } + + @Override + public void onServerJoin(SocketAddress address) { + synchronized (joinAddresses) { + joinAddresses.add(address); + joinAddresses.notifyAll(); + } + } + }; + + routingService.registerListener(routingListener); + name.changeAddrs(addresses1); + + routingService.startService(); + + synchronized (joinAddresses) { + while (joinAddresses.size() < numHosts) { + joinAddresses.wait(); + } + } + + // validate 4 nodes joined + synchronized (joinAddresses) { + assertEquals(numHosts, joinAddresses.size()); + } + synchronized (leftAddresses) { + assertEquals(0, leftAddresses.size()); + } + assertEquals(numHosts, routingService.shardId2Address.size()); + assertEquals(numHosts, routingService.address2ShardId.size()); + for (int i = 0; i < numHosts; i++) { + InetSocketAddress inetAddress = new InetSocketAddress("127.0.0.1", basePort + i); + assertTrue(routingService.address2ShardId.containsKey(inetAddress)); + int shardId = routingService.address2ShardId.get(inetAddress); + SocketAddress sa = routingService.shardId2Address.get(shardId); + assertNotNull(sa); + assertEquals(inetAddress, sa); + } + + // update addresses2 - 2 new hosts joined, 2 old hosts left + name.changeAddrs(addresses2); + synchronized (joinAddresses) { + while (joinAddresses.size() < numHosts + 2) { + joinAddresses.wait(); + } + } + synchronized (leftAddresses) { + while (leftAddresses.size() < numHosts - 2) { + leftAddresses.wait(); + } + } + assertEquals(numHosts, routingService.shardId2Address.size()); + assertEquals(numHosts, routingService.address2ShardId.size()); + + // first 2 shards should leave + for (int i = 0; i < 2; i++) { + InetSocketAddress inetAddress = new InetSocketAddress("127.0.0.1", basePort + i); + assertFalse(routingService.address2ShardId.containsKey(inetAddress)); + } + + for (int i = 0; i < numHosts; i++) { + InetSocketAddress inetAddress = new InetSocketAddress("127.0.0.1", basePort + 2 + i); + assertTrue(routingService.address2ShardId.containsKey(inetAddress)); + int shardId = routingService.address2ShardId.get(inetAddress); + SocketAddress sa = routingService.shardId2Address.get(shardId); + assertNotNull(sa); + assertEquals(inetAddress, sa); + } + + // update addresses3 - 2 new hosts joined, 2 old hosts left + name.changeAddrs(addresses3); + synchronized (joinAddresses) { + while (joinAddresses.size() < numHosts + 2 + numHosts) { + joinAddresses.wait(); + } + } + synchronized (leftAddresses) { + while (leftAddresses.size() < numHosts - 2 + numHosts) { + leftAddresses.wait(); + } + } + assertEquals(numHosts, routingService.shardId2Address.size()); + assertEquals(numHosts, routingService.address2ShardId.size()); + + // first 6 shards should leave + for (int i = 0; i < 2 + numHosts; i++) { + InetSocketAddress inetAddress = new InetSocketAddress("127.0.0.1", basePort + i); + assertFalse(routingService.address2ShardId.containsKey(inetAddress)); + } + // new 4 shards should exist + for (int i = 0; i < numHosts; i++) { + InetSocketAddress inetAddress = new InetSocketAddress("127.0.0.1", basePort + 10 + i); + assertTrue(routingService.address2ShardId.containsKey(inetAddress)); + int shardId = routingService.address2ShardId.get(inetAddress); + SocketAddress sa = routingService.shardId2Address.get(shardId); + assertNotNull(sa); + assertEquals(inetAddress, sa); + } + + } + + private static class TestServerSetWatcher implements ServerSetWatcher { + + final LinkedBlockingQueue<ImmutableSet<DLSocketAddress>> changeQueue = + new LinkedBlockingQueue<ImmutableSet<DLSocketAddress>>(); + final CopyOnWriteArrayList<ServerSetMonitor> monitors = + new CopyOnWriteArrayList<ServerSetMonitor>(); + + @Override + public void watch(ServerSetMonitor monitor) throws MonitorException { + monitors.add(monitor); + ImmutableSet<DLSocketAddress> change; + while ((change = changeQueue.poll()) != null) { + notifyChanges(change); + } + } + + void notifyChanges(ImmutableSet<DLSocketAddress> addresses) { + if (monitors.isEmpty()) { + changeQueue.add(addresses); + } else { + for (ServerSetMonitor monitor : monitors) { + monitor.onChange(addresses); + } + } + } + } + + @Test(timeout = 60000) + public void testPerformServerSetChangeOnServerSet() throws Exception { + TestServerSetWatcher serverSetWatcher = new TestServerSetWatcher(); + ConsistentHashRoutingService routingService = new ConsistentHashRoutingService( + serverSetWatcher, 997, Integer.MAX_VALUE, NullStatsReceiver.get()); + + int basePort = 3180; + int numHosts = 4; + Set<DLSocketAddress> addresses1 = Sets.newConcurrentHashSet(); + Set<DLSocketAddress> addresses2 = Sets.newConcurrentHashSet(); + Set<DLSocketAddress> addresses3 = Sets.newConcurrentHashSet(); + + // fill up the addresses1 + for (int i = 0; i < numHosts; i++) { + InetSocketAddress inetAddress = new InetSocketAddress("127.0.0.1", basePort + i); + DLSocketAddress dsa = new DLSocketAddress(i, inetAddress); + addresses1.add(dsa); + } + // fill up the addresses2 - overlap with addresses1 + for (int i = 0; i < numHosts; i++) { + InetSocketAddress inetAddress = new InetSocketAddress("127.0.0.1", basePort + numHosts + i); + DLSocketAddress dsa = new DLSocketAddress(i + 2, inetAddress); + addresses2.add(dsa); + } + // fill up the addresses3 - not overlap with addresses2 + for (int i = 0; i < numHosts; i++) { + InetSocketAddress inetAddress = new InetSocketAddress("127.0.0.1", basePort + 10 + i); + DLSocketAddress dsa = new DLSocketAddress(i, inetAddress); + addresses3.add(dsa); + } + + final List<SocketAddress> leftAddresses = Lists.newArrayList(); + final List<SocketAddress> joinAddresses = Lists.newArrayList(); + + RoutingService.RoutingListener routingListener = new RoutingService.RoutingListener() { + @Override + public void onServerLeft(SocketAddress address) { + synchronized (leftAddresses) { + leftAddresses.add(address); + leftAddresses.notifyAll(); + } + } + + @Override + public void onServerJoin(SocketAddress address) { + synchronized (joinAddresses) { + joinAddresses.add(address); + joinAddresses.notifyAll(); + } + } + }; + + routingService.registerListener(routingListener); + serverSetWatcher.notifyChanges(ImmutableSet.copyOf(addresses1)); + + routingService.startService(); + + synchronized (joinAddresses) { + while (joinAddresses.size() < numHosts) { + joinAddresses.wait(); + } + } + + // validate 4 nodes joined + synchronized (joinAddresses) { + assertEquals(numHosts, joinAddresses.size()); + } + synchronized (leftAddresses) { + assertEquals(0, leftAddresses.size()); + } + assertEquals(numHosts, routingService.shardId2Address.size()); + assertEquals(numHosts, routingService.address2ShardId.size()); + for (int i = 0; i < numHosts; i++) { + InetSocketAddress inetAddress = new InetSocketAddress("127.0.0.1", basePort + i); + assertTrue(routingService.address2ShardId.containsKey(inetAddress)); + int shardId = routingService.address2ShardId.get(inetAddress); + assertEquals(i, shardId); + SocketAddress sa = routingService.shardId2Address.get(shardId); + assertNotNull(sa); + assertEquals(inetAddress, sa); + } + + // update addresses2 - 2 new hosts joined, 2 old hosts left + serverSetWatcher.notifyChanges(ImmutableSet.copyOf(addresses2)); + synchronized (joinAddresses) { + while (joinAddresses.size() < numHosts + 2) { + joinAddresses.wait(); + } + } + synchronized (leftAddresses) { + while (leftAddresses.size() < 2) { + leftAddresses.wait(); + } + } + + assertEquals(numHosts + 2, routingService.shardId2Address.size()); + assertEquals(numHosts + 2, routingService.address2ShardId.size()); + // first 2 shards should not leave + for (int i = 0; i < 2; i++) { + InetSocketAddress inetAddress = new InetSocketAddress("127.0.0.1", basePort + i); + assertTrue(routingService.address2ShardId.containsKey(inetAddress)); + int shardId = routingService.address2ShardId.get(inetAddress); + assertEquals(i, shardId); + SocketAddress sa = routingService.shardId2Address.get(shardId); + assertNotNull(sa); + assertEquals(inetAddress, sa); + } + for (int i = 0; i < numHosts; i++) { + InetSocketAddress inetAddress = new InetSocketAddress("127.0.0.1", basePort + numHosts + i); + assertTrue(routingService.address2ShardId.containsKey(inetAddress)); + int shardId = routingService.address2ShardId.get(inetAddress); + assertEquals(i + 2, shardId); + SocketAddress sa = routingService.shardId2Address.get(shardId); + assertNotNull(sa); + assertEquals(inetAddress, sa); + } + + // update addresses3 + serverSetWatcher.notifyChanges(ImmutableSet.copyOf(addresses3)); + synchronized (joinAddresses) { + while (joinAddresses.size() < numHosts + 2 + numHosts) { + joinAddresses.wait(); + } + } + synchronized (leftAddresses) { + while (leftAddresses.size() < 2 + numHosts) { + leftAddresses.wait(); + } + } + assertEquals(numHosts + 2, routingService.shardId2Address.size()); + assertEquals(numHosts + 2, routingService.address2ShardId.size()); + + // first 4 shards should leave + for (int i = 0; i < numHosts; i++) { + InetSocketAddress inetAddress = new InetSocketAddress("127.0.0.1", basePort + 10 + i); + assertTrue(routingService.address2ShardId.containsKey(inetAddress)); + int shardId = routingService.address2ShardId.get(inetAddress); + assertEquals(i, shardId); + SocketAddress sa = routingService.shardId2Address.get(shardId); + assertNotNull(sa); + assertEquals(inetAddress, sa); + } + // the other 2 shards should be still there + for (int i = 0; i < 2; i++) { + InetSocketAddress inetAddress = new InetSocketAddress("127.0.0.1", basePort + numHosts + 2 + i); + assertTrue(routingService.address2ShardId.containsKey(inetAddress)); + int shardId = routingService.address2ShardId.get(inetAddress); + assertEquals(numHosts + i, shardId); + SocketAddress sa = routingService.shardId2Address.get(shardId); + assertNotNull(sa); + assertEquals(inetAddress, sa); + } + + } +} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-client/src/test/java/org/apache/distributedlog/client/routing/TestInetNameResolution.java ---------------------------------------------------------------------- diff --git a/distributedlog-proxy-client/src/test/java/org/apache/distributedlog/client/routing/TestInetNameResolution.java b/distributedlog-proxy-client/src/test/java/org/apache/distributedlog/client/routing/TestInetNameResolution.java new file mode 100644 index 0000000..59665b9 --- /dev/null +++ b/distributedlog-proxy-client/src/test/java/org/apache/distributedlog/client/routing/TestInetNameResolution.java @@ -0,0 +1,73 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.distributedlog.client.routing; + +import com.google.common.collect.ImmutableSet; +import com.twitter.common.net.pool.DynamicHostSet; +import com.twitter.thrift.Endpoint; +import com.twitter.thrift.ServiceInstance; +import java.net.InetSocketAddress; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicBoolean; +import org.junit.Assert; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Test Case for `inet` name resolution. + */ +public class TestInetNameResolution { + + private static final Logger logger = LoggerFactory.getLogger(TestRoutingService.class); + + @Test(timeout = 10000) + public void testInetNameResolution() throws Exception { + String nameStr = "inet!127.0.0.1:3181"; + final CountDownLatch resolved = new CountDownLatch(1); + final AtomicBoolean validationFailed = new AtomicBoolean(false); + + NameServerSet serverSet = new NameServerSet(nameStr); + serverSet.watch(new DynamicHostSet.HostChangeMonitor<ServiceInstance>() { + @Override + public void onChange(ImmutableSet<ServiceInstance> hostSet) { + if (hostSet.size() > 1) { + logger.error("HostSet has more elements than expected {}", hostSet); + validationFailed.set(true); + resolved.countDown(); + } else if (hostSet.size() == 1) { + ServiceInstance serviceInstance = hostSet.iterator().next(); + Endpoint endpoint = serviceInstance.getAdditionalEndpoints().get("thrift"); + InetSocketAddress address = new InetSocketAddress(endpoint.getHost(), endpoint.getPort()); + if (endpoint.getPort() != 3181) { + logger.error("Port does not match the expected port {}", endpoint.getPort()); + validationFailed.set(true); + } else if (!address.getAddress().getHostAddress().equals("127.0.0.1")) { + logger.error("Host address does not match the expected address {}", + address.getAddress().getHostAddress()); + validationFailed.set(true); + } + resolved.countDown(); + } + } + }); + + resolved.await(); + Assert.assertEquals(false, validationFailed.get()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-client/src/test/java/org/apache/distributedlog/client/routing/TestRegionsRoutingService.java ---------------------------------------------------------------------- diff --git a/distributedlog-proxy-client/src/test/java/org/apache/distributedlog/client/routing/TestRegionsRoutingService.java b/distributedlog-proxy-client/src/test/java/org/apache/distributedlog/client/routing/TestRegionsRoutingService.java new file mode 100644 index 0000000..151663e --- /dev/null +++ b/distributedlog-proxy-client/src/test/java/org/apache/distributedlog/client/routing/TestRegionsRoutingService.java @@ -0,0 +1,133 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.distributedlog.client.routing; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import com.google.common.collect.Sets; +import org.apache.distributedlog.client.resolver.DefaultRegionResolver; +import org.apache.distributedlog.thrift.service.StatusCode; +import com.twitter.finagle.NoBrokersAvailableException; +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicInteger; +import org.junit.Test; + +/** + * Test Case for {@link RegionsRoutingService}. + */ +public class TestRegionsRoutingService { + + @Test(timeout = 60000) + public void testRoutingListener() throws Exception { + int numRoutingServices = 5; + RoutingService.Builder[] routingServiceBuilders = new RoutingService.Builder[numRoutingServices]; + Set<SocketAddress> hosts = new HashSet<SocketAddress>(); + Map<SocketAddress, String> regionMap = new HashMap<SocketAddress, String>(); + for (int i = 0; i < numRoutingServices; i++) { + String finagleNameStr = "inet!127.0.0.1:" + (3181 + i); + routingServiceBuilders[i] = RoutingUtils.buildRoutingService(finagleNameStr); + SocketAddress address = new InetSocketAddress("127.0.0.1", 3181 + i); + hosts.add(address); + regionMap.put(address, "region-" + i); + } + + final CountDownLatch doneLatch = new CountDownLatch(numRoutingServices); + final AtomicInteger numHostsLeft = new AtomicInteger(0); + final Set<SocketAddress> jointHosts = new HashSet<SocketAddress>(); + RegionsRoutingService regionsRoutingService = + RegionsRoutingService.newBuilder() + .routingServiceBuilders(routingServiceBuilders) + .resolver(new DefaultRegionResolver(regionMap)) + .build(); + regionsRoutingService.registerListener(new RoutingService.RoutingListener() { + @Override + public void onServerLeft(SocketAddress address) { + numHostsLeft.incrementAndGet(); + } + + @Override + public void onServerJoin(SocketAddress address) { + jointHosts.add(address); + doneLatch.countDown(); + } + }); + + regionsRoutingService.startService(); + + doneLatch.await(); + + assertEquals(numRoutingServices, jointHosts.size()); + assertEquals(0, numHostsLeft.get()); + assertTrue(Sets.difference(hosts, jointHosts).immutableCopy().isEmpty()); + } + + @Test(timeout = 60000) + public void testGetHost() throws Exception { + int numRoutingServices = 3; + RoutingService.Builder[] routingServiceBuilders = new RoutingService.Builder[numRoutingServices]; + Map<SocketAddress, String> regionMap = new HashMap<SocketAddress, String>(); + for (int i = 0; i < numRoutingServices; i++) { + String finagleNameStr = "inet!127.0.0.1:" + (3181 + i); + routingServiceBuilders[i] = RoutingUtils.buildRoutingService(finagleNameStr); + SocketAddress address = new InetSocketAddress("127.0.0.1", 3181 + i); + regionMap.put(address, "region-" + i); + } + + RegionsRoutingService regionsRoutingService = + RegionsRoutingService.newBuilder() + .resolver(new DefaultRegionResolver(regionMap)) + .routingServiceBuilders(routingServiceBuilders) + .build(); + regionsRoutingService.startService(); + + RoutingService.RoutingContext routingContext = + RoutingService.RoutingContext.of(new DefaultRegionResolver()) + .addTriedHost(new InetSocketAddress("127.0.0.1", 3183), StatusCode.WRITE_EXCEPTION); + assertEquals(new InetSocketAddress("127.0.0.1", 3181), + regionsRoutingService.getHost("any", routingContext)); + + routingContext = + RoutingService.RoutingContext.of(new DefaultRegionResolver()) + .addTriedHost(new InetSocketAddress("127.0.0.1", 3181), StatusCode.WRITE_EXCEPTION); + assertEquals(new InetSocketAddress("127.0.0.1", 3182), + regionsRoutingService.getHost("any", routingContext)); + + // add 3182 to routing context as tried host + routingContext.addTriedHost(new InetSocketAddress("127.0.0.1", 3182), StatusCode.WRITE_EXCEPTION); + assertEquals(new InetSocketAddress("127.0.0.1", 3183), + regionsRoutingService.getHost("any", routingContext)); + + // add 3183 to routing context as tried host + routingContext.addTriedHost(new InetSocketAddress("127.0.0.1", 3183), StatusCode.WRITE_EXCEPTION); + try { + regionsRoutingService.getHost("any", routingContext); + fail("Should fail to get host since all regions are tried."); + } catch (NoBrokersAvailableException nbae) { + // expected + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-client/src/test/java/org/apache/distributedlog/client/routing/TestRoutingService.java ---------------------------------------------------------------------- diff --git a/distributedlog-proxy-client/src/test/java/org/apache/distributedlog/client/routing/TestRoutingService.java b/distributedlog-proxy-client/src/test/java/org/apache/distributedlog/client/routing/TestRoutingService.java new file mode 100644 index 0000000..d2d61a9 --- /dev/null +++ b/distributedlog-proxy-client/src/test/java/org/apache/distributedlog/client/routing/TestRoutingService.java @@ -0,0 +1,146 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.distributedlog.client.routing; + +import static org.junit.Assert.assertEquals; + +import org.apache.distributedlog.client.resolver.DefaultRegionResolver; +import com.twitter.finagle.Address; +import com.twitter.finagle.Addresses; +import com.twitter.finagle.addr.WeightedAddress; +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashSet; +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Test Case for {@link RoutingService}. + */ +@RunWith(Parameterized.class) +public class TestRoutingService { + + static final Logger LOG = LoggerFactory.getLogger(TestRoutingService.class); + + @Parameterized.Parameters + public static Collection<Object[]> configs() { + ArrayList<Object[]> list = new ArrayList<Object[]>(); + for (int i = 0; i <= 1; i++) { + for (int j = 0; j <= 1; j++) { + for (int k = 0; k <= 1; k++) { + list.add(new Boolean[] {i == 1, j == 1, k == 1}); + } + } + } + return list; + } + + private final boolean consistentHash; + private final boolean weightedAddresses; + private final boolean asyncResolution; + + public TestRoutingService(boolean consistentHash, boolean weightedAddresses, boolean asyncResolution) { + this.consistentHash = consistentHash; + this.weightedAddresses = weightedAddresses; + this.asyncResolution = asyncResolution; + } + + private List<Address> getAddresses(boolean weightedAddresses) { + ArrayList<Address> addresses = new ArrayList<Address>(); + addresses.add(Addresses.newInetAddress(new InetSocketAddress("127.0.0.1", 3181))); + addresses.add(Addresses.newInetAddress(new InetSocketAddress("127.0.0.2", 3181))); + addresses.add(Addresses.newInetAddress(new InetSocketAddress("127.0.0.3", 3181))); + addresses.add(Addresses.newInetAddress(new InetSocketAddress("127.0.0.4", 3181))); + addresses.add(Addresses.newInetAddress(new InetSocketAddress("127.0.0.5", 3181))); + addresses.add(Addresses.newInetAddress(new InetSocketAddress("127.0.0.6", 3181))); + addresses.add(Addresses.newInetAddress(new InetSocketAddress("127.0.0.7", 3181))); + + if (weightedAddresses) { + ArrayList<Address> wAddresses = new ArrayList<Address>(); + for (Address address: addresses) { + wAddresses.add(WeightedAddress.apply(address, 1.0)); + } + return wAddresses; + } else { + return addresses; + } + } + + private void testRoutingServiceHelper(boolean consistentHash, + boolean weightedAddresses, + boolean asyncResolution) + throws Exception { + ExecutorService executorService = null; + final List<Address> addresses = getAddresses(weightedAddresses); + final TestName name = new TestName(); + RoutingService routingService; + if (consistentHash) { + routingService = ConsistentHashRoutingService.newBuilder() + .serverSet(new NameServerSet(name)) + .resolveFromName(true) + .numReplicas(997) + .build(); + } else { + routingService = ServerSetRoutingService.newServerSetRoutingServiceBuilder() + .serverSetWatcher(new TwitterServerSetWatcher(new NameServerSet(name), true)).build(); + } + + if (asyncResolution) { + executorService = Executors.newSingleThreadExecutor(); + executorService.submit(new Runnable() { + @Override + public void run() { + name.changeAddrs(addresses); + } + }); + } else { + name.changeAddrs(addresses); + } + routingService.startService(); + + HashSet<SocketAddress> mapping = new HashSet<SocketAddress>(); + + for (int i = 0; i < 1000; i++) { + for (int j = 0; j < 5; j++) { + String stream = "TestStream-" + i + "-" + j; + mapping.add(routingService.getHost(stream, + RoutingService.RoutingContext.of(new DefaultRegionResolver()))); + } + } + + assertEquals(mapping.size(), addresses.size()); + + if (null != executorService) { + executorService.shutdown(); + } + + } + + @Test(timeout = 5000) + public void testRoutingService() throws Exception { + testRoutingServiceHelper(this.consistentHash, this.weightedAddresses, this.asyncResolution); + } +} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-client/src/test/java/org/apache/distributedlog/client/speculative/TestDefaultSpeculativeRequestExecutionPolicy.java ---------------------------------------------------------------------- diff --git a/distributedlog-proxy-client/src/test/java/org/apache/distributedlog/client/speculative/TestDefaultSpeculativeRequestExecutionPolicy.java b/distributedlog-proxy-client/src/test/java/org/apache/distributedlog/client/speculative/TestDefaultSpeculativeRequestExecutionPolicy.java new file mode 100644 index 0000000..ab0cb58 --- /dev/null +++ b/distributedlog-proxy-client/src/test/java/org/apache/distributedlog/client/speculative/TestDefaultSpeculativeRequestExecutionPolicy.java @@ -0,0 +1,105 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.distributedlog.client.speculative; + +import static org.junit.Assert.assertEquals; +import static org.mockito.Mockito.mock; + +import com.twitter.util.CountDownLatch; +import com.twitter.util.Future; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.atomic.AtomicInteger; +import org.junit.Test; +import org.mockito.Mockito; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +/** + * Test {@link TestDefaultSpeculativeRequestExecutionPolicy}. + */ +public class TestDefaultSpeculativeRequestExecutionPolicy { + + @Test(timeout = 20000, expected = IllegalArgumentException.class) + public void testInvalidBackoffMultiplier() throws Exception { + new DefaultSpeculativeRequestExecutionPolicy(100, 200, -1); + } + + @Test(timeout = 20000, expected = IllegalArgumentException.class) + public void testInvalidMaxSpeculativeTimeout() throws Exception { + new DefaultSpeculativeRequestExecutionPolicy(100, Integer.MAX_VALUE, 2); + } + + @Test(timeout = 20000) + public void testSpeculativeRequests() throws Exception { + DefaultSpeculativeRequestExecutionPolicy policy = + new DefaultSpeculativeRequestExecutionPolicy(10, 10000, 2); + SpeculativeRequestExecutor executor = mock(SpeculativeRequestExecutor.class); + + final AtomicInteger callCount = new AtomicInteger(0); + final CountDownLatch latch = new CountDownLatch(3); + + Mockito.doAnswer(new Answer() { + @Override + public Object answer(InvocationOnMock invocation) throws Throwable { + try { + return Future.value(callCount.incrementAndGet() < 3); + } finally { + latch.countDown(); + } + } + }).when(executor).issueSpeculativeRequest(); + + ScheduledExecutorService executorService = + Executors.newSingleThreadScheduledExecutor(); + policy.initiateSpeculativeRequest(executorService, executor); + + latch.await(); + + assertEquals(40, policy.getNextSpeculativeRequestTimeout()); + } + + @Test(timeout = 20000) + public void testSpeculativeRequestsWithMaxTimeout() throws Exception { + DefaultSpeculativeRequestExecutionPolicy policy = + new DefaultSpeculativeRequestExecutionPolicy(10, 15, 2); + SpeculativeRequestExecutor executor = mock(SpeculativeRequestExecutor.class); + + final AtomicInteger callCount = new AtomicInteger(0); + final CountDownLatch latch = new CountDownLatch(3); + + Mockito.doAnswer(new Answer() { + @Override + public Object answer(InvocationOnMock invocation) throws Throwable { + try { + return Future.value(callCount.incrementAndGet() < 3); + } finally { + latch.countDown(); + } + } + }).when(executor).issueSpeculativeRequest(); + + ScheduledExecutorService executorService = + Executors.newSingleThreadScheduledExecutor(); + policy.initiateSpeculativeRequest(executorService, executor); + + latch.await(); + + assertEquals(15, policy.getNextSpeculativeRequestTimeout()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-client/src/test/java/org/apache/distributedlog/service/TestDistributedLogClientBuilder.java ---------------------------------------------------------------------- diff --git a/distributedlog-proxy-client/src/test/java/org/apache/distributedlog/service/TestDistributedLogClientBuilder.java b/distributedlog-proxy-client/src/test/java/org/apache/distributedlog/service/TestDistributedLogClientBuilder.java new file mode 100644 index 0000000..d2df9a5 --- /dev/null +++ b/distributedlog-proxy-client/src/test/java/org/apache/distributedlog/service/TestDistributedLogClientBuilder.java @@ -0,0 +1,49 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.distributedlog.service; + +import static org.junit.Assert.assertFalse; + +import com.twitter.finagle.builder.ClientBuilder; +import com.twitter.finagle.thrift.ClientId$; +import com.twitter.util.Duration; +import org.junit.Test; + +/** + * Test Case of {@link org.apache.distributedlog.service.DistributedLogClientBuilder}. + */ +public class TestDistributedLogClientBuilder { + + @Test(timeout = 60000) + public void testBuildClientsFromSameBuilder() throws Exception { + DistributedLogClientBuilder builder = DistributedLogClientBuilder.newBuilder() + .name("build-clients-from-same-builder") + .clientId(ClientId$.MODULE$.apply("test-builder")) + .finagleNameStr("inet!127.0.0.1:7001") + .streamNameRegex(".*") + .handshakeWithClientInfo(true) + .clientBuilder(ClientBuilder.get() + .hostConnectionLimit(1) + .connectTimeout(Duration.fromSeconds(1)) + .tcpConnectTimeout(Duration.fromSeconds(1)) + .requestTimeout(Duration.fromSeconds(10))); + DistributedLogClient client1 = builder.build(); + DistributedLogClient client2 = builder.build(); + assertFalse(client1 == client2); + } +} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-client/src/test/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/distributedlog-proxy-client/src/test/resources/log4j.properties b/distributedlog-proxy-client/src/test/resources/log4j.properties new file mode 100644 index 0000000..3e51059 --- /dev/null +++ b/distributedlog-proxy-client/src/test/resources/log4j.properties @@ -0,0 +1,51 @@ +#/** +# * Licensed to the Apache Software Foundation (ASF) under one +# * or more contributor license agreements. See the NOTICE file +# * distributed with this work for additional information +# * regarding copyright ownership. The ASF licenses this file +# * to you under the Apache License, Version 2.0 (the +# * "License"); you may not use this file except in compliance +# * with the License. You may obtain a copy of the License at +# * +# * http://www.apache.org/licenses/LICENSE-2.0 +# * +# * Unless required by applicable law or agreed to in writing, software +# * distributed under the License is distributed on an "AS IS" BASIS, +# * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# * See the License for the specific language governing permissions and +# * limitations under the License. +# */ + +# +# DisributedLog Logging Configuration +# + +# Example with rolling log file +log4j.rootLogger=INFO, CONSOLE + +#disable zookeeper logging +log4j.logger.org.apache.zookeeper=OFF +#Set the bookkeeper level to warning +log4j.logger.org.apache.bookkeeper=INFO + +log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender +log4j.appender.CONSOLE.Threshold=INFO +log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout +log4j.appender.CONSOLE.layout.ConversionPattern=%d{ISO8601} - %-5p - [%t:%C{1}@%L] - %m%n + +# Add ROLLINGFILE to rootLogger to get log file output +# Log DEBUG level and above messages to a log file +#log4j.appender.ROLLINGFILE=org.apache.log4j.DailyRollingFileAppender +#log4j.appender.ROLLINGFILE.Threshold=INFO +#log4j.appender.ROLLINGFILE.File=distributedlog.log +#log4j.appender.ROLLINGFILE.layout=org.apache.log4j.PatternLayout +#log4j.appender.ROLLINGFILE.DatePattern='.'yyyy-MM-dd-HH-mm +#log4j.appender.ROLLINGFILE.layout.ConversionPattern=%d{ISO8601} - %-5p - [%t:%C{1}@%L] - %m%n + +log4j.appender.R=org.apache.log4j.RollingFileAppender +log4j.appender.R.Threshold=TRACE +log4j.appender.R.File=target/error.log +log4j.appender.R.MaxFileSize=200MB +log4j.appender.R.MaxBackupIndex=7 +log4j.appender.R.layout=org.apache.log4j.PatternLayout +log4j.appender.R.layout.ConversionPattern=%d{ISO8601} - %-5p - [%t:%C{1}@%L] - %m%n