http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/DistributedLogServerTestCase.java ---------------------------------------------------------------------- diff --git a/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/DistributedLogServerTestCase.java b/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/DistributedLogServerTestCase.java new file mode 100644 index 0000000..58b5b2a --- /dev/null +++ b/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/DistributedLogServerTestCase.java @@ -0,0 +1,298 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.distributedlog.service; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import com.google.common.base.Optional; +import com.google.common.collect.Sets; +import org.apache.distributedlog.DLMTestUtil; +import org.apache.distributedlog.DistributedLogConfiguration; +import org.apache.distributedlog.client.DistributedLogClientImpl; +import org.apache.distributedlog.client.resolver.DefaultRegionResolver; +import org.apache.distributedlog.client.routing.LocalRoutingService; +import org.apache.distributedlog.client.routing.RegionsRoutingService; +import org.apache.distributedlog.service.DistributedLogCluster.DLServer; +import org.apache.distributedlog.service.stream.StreamManager; +import org.apache.distributedlog.service.stream.StreamManagerImpl; +import com.twitter.finagle.builder.ClientBuilder; +import com.twitter.finagle.thrift.ClientId$; +import com.twitter.util.Duration; +import java.net.SocketAddress; +import java.net.URI; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; + +/** + * Base test case for distributedlog servers. + */ +public abstract class DistributedLogServerTestCase { + + protected static DistributedLogConfiguration conf = + new DistributedLogConfiguration().setLockTimeout(10) + .setOutputBufferSize(0).setPeriodicFlushFrequencyMilliSeconds(10); + protected static DistributedLogConfiguration noAdHocConf = + new DistributedLogConfiguration().setLockTimeout(10).setCreateStreamIfNotExists(false) + .setOutputBufferSize(0).setPeriodicFlushFrequencyMilliSeconds(10); + protected static DistributedLogCluster dlCluster; + protected static DistributedLogCluster noAdHocCluster; + + /** + * A distributedlog client wrapper for testing. + */ + protected static class DLClient { + public final LocalRoutingService routingService; + public DistributedLogClientBuilder dlClientBuilder; + public final DistributedLogClientImpl dlClient; + + protected DLClient(String name, + String streamNameRegex, + Optional<String> serverSideRoutingFinagleName) { + routingService = LocalRoutingService.newBuilder().build(); + dlClientBuilder = DistributedLogClientBuilder.newBuilder() + .name(name) + .clientId(ClientId$.MODULE$.apply(name)) + .routingService(routingService) + .streamNameRegex(streamNameRegex) + .handshakeWithClientInfo(true) + .clientBuilder(ClientBuilder.get() + .hostConnectionLimit(1) + .connectionTimeout(Duration.fromSeconds(1)) + .requestTimeout(Duration.fromSeconds(60))); + if (serverSideRoutingFinagleName.isPresent()) { + dlClientBuilder = + dlClientBuilder.serverRoutingServiceFinagleNameStr(serverSideRoutingFinagleName.get()); + } + dlClient = (DistributedLogClientImpl) dlClientBuilder.build(); + } + + public void handshake() { + dlClient.handshake(); + } + + public void shutdown() { + dlClient.close(); + } + } + + /** + * A distributedlog client wrapper that talks to two regions. + */ + protected static class TwoRegionDLClient { + + public final LocalRoutingService localRoutingService; + public final LocalRoutingService remoteRoutingService; + public final DistributedLogClientBuilder dlClientBuilder; + public final DistributedLogClientImpl dlClient; + + protected TwoRegionDLClient(String name, Map<SocketAddress, String> regionMap) { + localRoutingService = new LocalRoutingService(); + remoteRoutingService = new LocalRoutingService(); + RegionsRoutingService regionsRoutingService = + RegionsRoutingService.of(new DefaultRegionResolver(regionMap), + localRoutingService, remoteRoutingService); + dlClientBuilder = DistributedLogClientBuilder.newBuilder() + .name(name) + .clientId(ClientId$.MODULE$.apply(name)) + .routingService(regionsRoutingService) + .streamNameRegex(".*") + .handshakeWithClientInfo(true) + .maxRedirects(2) + .clientBuilder(ClientBuilder.get() + .hostConnectionLimit(1) + .connectionTimeout(Duration.fromSeconds(1)) + .requestTimeout(Duration.fromSeconds(10))); + dlClient = (DistributedLogClientImpl) dlClientBuilder.build(); + } + + public void shutdown() { + dlClient.close(); + } + } + + private final boolean clientSideRouting; + protected DLServer dlServer; + protected DLClient dlClient; + protected DLServer noAdHocServer; + protected DLClient noAdHocClient; + + public static DistributedLogCluster createCluster(DistributedLogConfiguration conf) throws Exception { + return DistributedLogCluster.newBuilder() + .numBookies(3) + .shouldStartZK(true) + .zkServers("127.0.0.1") + .shouldStartProxy(false) + .dlConf(conf) + .bkConf(DLMTestUtil.loadTestBkConf()) + .build(); + } + + @BeforeClass + public static void setupCluster() throws Exception { + dlCluster = createCluster(conf); + dlCluster.start(); + } + + public void setupNoAdHocCluster() throws Exception { + noAdHocCluster = createCluster(noAdHocConf); + noAdHocCluster.start(); + noAdHocServer = new DLServer(noAdHocConf, noAdHocCluster.getUri(), 7002, false); + Optional<String> serverSideRoutingFinagleName = Optional.absent(); + if (!clientSideRouting) { + serverSideRoutingFinagleName = + Optional.of("inet!" + DLSocketAddress.toString(noAdHocServer.getAddress())); + } + noAdHocClient = createDistributedLogClient("no-ad-hoc-client", serverSideRoutingFinagleName); + } + + public void tearDownNoAdHocCluster() throws Exception { + if (null != noAdHocClient) { + noAdHocClient.shutdown(); + } + if (null != noAdHocServer) { + noAdHocServer.shutdown(); + } + } + + @AfterClass + public static void teardownCluster() throws Exception { + if (null != dlCluster) { + dlCluster.stop(); + } + if (null != noAdHocCluster) { + noAdHocCluster.stop(); + } + } + + protected static URI getUri() { + return dlCluster.getUri(); + } + + protected DistributedLogServerTestCase(boolean clientSideRouting) { + this.clientSideRouting = clientSideRouting; + } + + @Before + public void setup() throws Exception { + dlServer = createDistributedLogServer(7001); + Optional<String> serverSideRoutingFinagleName = Optional.absent(); + if (!clientSideRouting) { + serverSideRoutingFinagleName = + Optional.of("inet!" + DLSocketAddress.toString(dlServer.getAddress())); + } + dlClient = createDistributedLogClient("test", serverSideRoutingFinagleName); + } + + @After + public void teardown() throws Exception { + if (null != dlClient) { + dlClient.shutdown(); + } + if (null != dlServer) { + dlServer.shutdown(); + } + } + + protected DLServer createDistributedLogServer(int port) throws Exception { + return new DLServer(conf, dlCluster.getUri(), port, false); + } + + protected DLServer createDistributedLogServer(DistributedLogConfiguration conf, int port) + throws Exception { + return new DLServer(conf, dlCluster.getUri(), port, false); + } + + protected DLClient createDistributedLogClient(String clientName, + Optional<String> serverSideRoutingFinagleName) + throws Exception { + return createDistributedLogClient(clientName, ".*", serverSideRoutingFinagleName); + } + + protected DLClient createDistributedLogClient(String clientName, + String streamNameRegex, + Optional<String> serverSideRoutingFinagleName) + throws Exception { + return new DLClient(clientName, streamNameRegex, serverSideRoutingFinagleName); + } + + protected TwoRegionDLClient createTwoRegionDLClient(String clientName, + Map<SocketAddress, String> regionMap) + throws Exception { + return new TwoRegionDLClient(clientName, regionMap); + } + + protected static void checkStreams(int numExpectedStreams, DLServer dlServer) { + StreamManager streamManager = dlServer.dlServer.getKey().getStreamManager(); + assertEquals(numExpectedStreams, streamManager.numCached()); + assertEquals(numExpectedStreams, streamManager.numAcquired()); + } + + protected static void checkStreams(Set<String> streams, DLServer dlServer) { + StreamManagerImpl streamManager = (StreamManagerImpl) dlServer.dlServer.getKey().getStreamManager(); + Set<String> cachedStreams = streamManager.getCachedStreams().keySet(); + Set<String> acquiredStreams = streamManager.getAcquiredStreams().keySet(); + + assertEquals(streams.size(), cachedStreams.size()); + assertEquals(streams.size(), acquiredStreams.size()); + assertTrue(Sets.difference(streams, cachedStreams).isEmpty()); + assertTrue(Sets.difference(streams, acquiredStreams).isEmpty()); + } + + protected static void checkStream(String name, DLClient dlClient, DLServer dlServer, + int expectedNumProxiesInClient, int expectedClientCacheSize, + int expectedServerCacheSize, boolean existedInServer, boolean existedInClient) { + Map<SocketAddress, Set<String>> distribution = dlClient.dlClient.getStreamOwnershipDistribution(); + assertEquals(expectedNumProxiesInClient, distribution.size()); + + if (expectedNumProxiesInClient > 0) { + Map.Entry<SocketAddress, Set<String>> localEntry = + distribution.entrySet().iterator().next(); + assertEquals(dlServer.getAddress(), localEntry.getKey()); + assertEquals(expectedClientCacheSize, localEntry.getValue().size()); + assertEquals(existedInClient, localEntry.getValue().contains(name)); + } + + StreamManagerImpl streamManager = (StreamManagerImpl) dlServer.dlServer.getKey().getStreamManager(); + Set<String> cachedStreams = streamManager.getCachedStreams().keySet(); + Set<String> acquiredStreams = streamManager.getCachedStreams().keySet(); + + assertEquals(expectedServerCacheSize, cachedStreams.size()); + assertEquals(existedInServer, cachedStreams.contains(name)); + assertEquals(expectedServerCacheSize, acquiredStreams.size()); + assertEquals(existedInServer, acquiredStreams.contains(name)); + } + + protected static Map<SocketAddress, Set<String>> getStreamOwnershipDistribution(DLClient dlClient) { + return dlClient.dlClient.getStreamOwnershipDistribution(); + } + + protected static Set<String> getAllStreamsFromDistribution(Map<SocketAddress, Set<String>> distribution) { + Set<String> allStreams = new HashSet<String>(); + for (Map.Entry<SocketAddress, Set<String>> entry : distribution.entrySet()) { + allStreams.addAll(entry.getValue()); + } + return allStreams; + } + +}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/TestDistributedLogServerBase.java ---------------------------------------------------------------------- diff --git a/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/TestDistributedLogServerBase.java b/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/TestDistributedLogServerBase.java new file mode 100644 index 0000000..4a5dd01 --- /dev/null +++ b/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/TestDistributedLogServerBase.java @@ -0,0 +1,720 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.distributedlog.service; + +import static com.google.common.base.Charsets.UTF_8; +import static org.apache.distributedlog.LogRecord.MAX_LOGRECORD_SIZE; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import com.google.common.base.Optional; +import org.apache.distributedlog.AsyncLogReader; +import org.apache.distributedlog.DLMTestUtil; +import org.apache.distributedlog.DLSN; +import org.apache.distributedlog.DistributedLogManager; +import org.apache.distributedlog.LogReader; +import org.apache.distributedlog.LogRecord; +import org.apache.distributedlog.LogRecordWithDLSN; +import org.apache.distributedlog.TestZooKeeperClientBuilder; +import org.apache.distributedlog.ZooKeeperClient; +import org.apache.distributedlog.acl.AccessControlManager; +import org.apache.distributedlog.annotations.DistributedLogAnnotations; +import org.apache.distributedlog.client.routing.LocalRoutingService; +import org.apache.distributedlog.exceptions.DLException; +import org.apache.distributedlog.exceptions.LogNotFoundException; +import org.apache.distributedlog.impl.acl.ZKAccessControl; +import org.apache.distributedlog.impl.metadata.BKDLConfig; +import org.apache.distributedlog.namespace.DistributedLogNamespace; +import org.apache.distributedlog.service.stream.StreamManagerImpl; +import org.apache.distributedlog.thrift.AccessControlEntry; +import org.apache.distributedlog.thrift.service.BulkWriteResponse; +import org.apache.distributedlog.thrift.service.HeartbeatOptions; +import org.apache.distributedlog.thrift.service.StatusCode; +import org.apache.distributedlog.thrift.service.WriteContext; +import org.apache.distributedlog.util.FailpointUtils; +import org.apache.distributedlog.util.FutureUtils; +import com.twitter.finagle.builder.ClientBuilder; +import com.twitter.finagle.thrift.ClientId$; +import com.twitter.util.Await; +import com.twitter.util.Duration; +import com.twitter.util.Future; +import com.twitter.util.Futures; +import java.net.SocketAddress; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import org.junit.Ignore; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TestName; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Test Case for {@link DistributedLogServer}. + */ +public abstract class TestDistributedLogServerBase extends DistributedLogServerTestCase { + + private static final Logger logger = LoggerFactory.getLogger(TestDistributedLogServerBase.class); + + @Rule + public TestName testName = new TestName(); + + protected TestDistributedLogServerBase(boolean clientSideRouting) { + super(clientSideRouting); + } + + /** + * {@link https://issues.apache.org/jira/browse/DL-27}. + */ + @DistributedLogAnnotations.FlakyTest + @Ignore + @Test(timeout = 60000) + public void testBasicWrite() throws Exception { + String name = "dlserver-basic-write"; + + dlClient.routingService.addHost(name, dlServer.getAddress()); + + for (long i = 1; i <= 10; i++) { + logger.debug("Write entry {} to stream {}.", i, name); + Await.result(dlClient.dlClient.write(name, ByteBuffer.wrap(("" + i).getBytes()))); + } + + HeartbeatOptions hbOptions = new HeartbeatOptions(); + hbOptions.setSendHeartBeatToReader(true); + // make sure the first log segment of each stream created + FutureUtils.result(dlClient.dlClient.heartbeat(name)); + + DistributedLogManager dlm = DLMTestUtil.createNewDLM(name, conf, getUri()); + LogReader reader = dlm.getInputStream(1); + int numRead = 0; + LogRecord r = reader.readNext(false); + while (null != r) { + ++numRead; + int i = Integer.parseInt(new String(r.getPayload())); + assertEquals(numRead, i); + r = reader.readNext(false); + } + assertEquals(10, numRead); + reader.close(); + dlm.close(); + } + + /** + * Sanity check to make sure both checksum flag values work. + */ + @Test(timeout = 60000) + public void testChecksumFlag() throws Exception { + String name = "testChecksumFlag"; + LocalRoutingService routingService = LocalRoutingService.newBuilder().build(); + routingService.addHost(name, dlServer.getAddress()); + DistributedLogClientBuilder dlClientBuilder = DistributedLogClientBuilder.newBuilder() + .name(name) + .clientId(ClientId$.MODULE$.apply("test")) + .routingService(routingService) + .handshakeWithClientInfo(true) + .clientBuilder(ClientBuilder.get() + .hostConnectionLimit(1) + .connectionTimeout(Duration.fromSeconds(1)) + .requestTimeout(Duration.fromSeconds(60))) + .checksum(false); + DistributedLogClient dlClient = dlClientBuilder.build(); + Await.result(dlClient.write(name, ByteBuffer.wrap(("1").getBytes()))); + dlClient.close(); + + dlClient = dlClientBuilder.checksum(true).build(); + Await.result(dlClient.write(name, ByteBuffer.wrap(("2").getBytes()))); + dlClient.close(); + } + + private void runSimpleBulkWriteTest(int writeCount) throws Exception { + String name = String.format("dlserver-bulk-write-%d", writeCount); + + dlClient.routingService.addHost(name, dlServer.getAddress()); + + List<ByteBuffer> writes = new ArrayList<ByteBuffer>(writeCount); + for (long i = 1; i <= writeCount; i++) { + writes.add(ByteBuffer.wrap(("" + i).getBytes())); + } + + logger.debug("Write {} entries to stream {}.", writeCount, name); + List<Future<DLSN>> futures = dlClient.dlClient.writeBulk(name, writes); + assertEquals(futures.size(), writeCount); + for (Future<DLSN> future : futures) { + // No throw == pass. + DLSN dlsn = Await.result(future, Duration.fromSeconds(10)); + } + + DistributedLogManager dlm = DLMTestUtil.createNewDLM(name, conf, getUri()); + LogReader reader = dlm.getInputStream(1); + int numRead = 0; + LogRecord r = reader.readNext(false); + while (null != r) { + int i = Integer.parseInt(new String(r.getPayload())); + assertEquals(numRead + 1, i); + ++numRead; + r = reader.readNext(false); + } + assertEquals(writeCount, numRead); + reader.close(); + dlm.close(); + } + + @Test(timeout = 60000) + public void testBulkWrite() throws Exception { + runSimpleBulkWriteTest(100); + } + + @Test(timeout = 60000) + public void testBulkWriteSingleWrite() throws Exception { + runSimpleBulkWriteTest(1); + } + + @Test(timeout = 60000) + public void testBulkWriteEmptyList() throws Exception { + String name = String.format("dlserver-bulk-write-%d", 0); + + dlClient.routingService.addHost(name, dlServer.getAddress()); + + List<ByteBuffer> writes = new ArrayList<ByteBuffer>(); + List<Future<DLSN>> futures = dlClient.dlClient.writeBulk(name, writes); + + assertEquals(0, futures.size()); + } + + @Test(timeout = 60000) + public void testBulkWriteNullArg() throws Exception { + + String name = String.format("dlserver-bulk-write-%s", "null"); + + dlClient.routingService.addHost(name, dlServer.getAddress()); + + List<ByteBuffer> writes = new ArrayList<ByteBuffer>(); + writes.add(null); + + try { + dlClient.dlClient.writeBulk(name, writes); + fail("should not have succeeded"); + } catch (NullPointerException npe) { + // expected + logger.info("Expected to catch NullPointException."); + } + } + + @Test(timeout = 60000) + public void testBulkWriteEmptyBuffer() throws Exception { + String name = String.format("dlserver-bulk-write-%s", "empty"); + + dlClient.routingService.addHost(name, dlServer.getAddress()); + + List<ByteBuffer> writes = new ArrayList<ByteBuffer>(); + writes.add(ByteBuffer.wrap(("").getBytes())); + writes.add(ByteBuffer.wrap(("").getBytes())); + List<Future<DLSN>> futures = dlClient.dlClient.writeBulk(name, writes); + assertEquals(2, futures.size()); + for (Future<DLSN> future : futures) { + // No throw == pass + DLSN dlsn = Await.result(future, Duration.fromSeconds(10)); + } + } + + void failDueToWrongException(Exception ex) { + logger.info("testBulkWritePartialFailure: ", ex); + fail(String.format("failed with wrong exception %s", ex.getClass().getName())); + } + + int validateAllFailedAsCancelled(List<Future<DLSN>> futures, int start, int finish) { + int failed = 0; + for (int i = start; i < finish; i++) { + Future<DLSN> future = futures.get(i); + try { + Await.result(future, Duration.fromSeconds(10)); + fail("future should have failed!"); + } catch (DLException cre) { + ++failed; + } catch (Exception ex) { + failDueToWrongException(ex); + } + } + return failed; + } + + void validateFailedAsLogRecordTooLong(Future<DLSN> future) { + try { + Await.result(future, Duration.fromSeconds(10)); + fail("should have failed"); + } catch (DLException dle) { + assertEquals(StatusCode.TOO_LARGE_RECORD.getValue(), dle.getCode()); + } catch (Exception ex) { + failDueToWrongException(ex); + } + } + + @Test(timeout = 60000) + public void testBulkWritePartialFailure() throws Exception { + String name = String.format("dlserver-bulk-write-%s", "partial-failure"); + + dlClient.routingService.addHost(name, dlServer.getAddress()); + + final int writeCount = 100; + + List<ByteBuffer> writes = new ArrayList<ByteBuffer>(writeCount * 2 + 1); + for (long i = 1; i <= writeCount; i++) { + writes.add(ByteBuffer.wrap(("" + i).getBytes())); + } + // Too big, will cause partial failure. + ByteBuffer buf = ByteBuffer.allocate(MAX_LOGRECORD_SIZE + 1); + writes.add(buf); + for (long i = 1; i <= writeCount; i++) { + writes.add(ByteBuffer.wrap(("" + i).getBytes())); + } + + // Count succeeded. + List<Future<DLSN>> futures = dlClient.dlClient.writeBulk(name, writes); + int succeeded = 0; + for (int i = 0; i < writeCount; i++) { + Future<DLSN> future = futures.get(i); + try { + Await.result(future, Duration.fromSeconds(10)); + ++succeeded; + } catch (Exception ex) { + failDueToWrongException(ex); + } + } + + validateFailedAsLogRecordTooLong(futures.get(writeCount)); + FutureUtils.result(Futures.collect(futures.subList(writeCount + 1, 2 * writeCount + 1))); + assertEquals(writeCount, succeeded); + } + + @Test(timeout = 60000) + public void testBulkWriteTotalFailureFirstWriteFailed() throws Exception { + String name = String.format("dlserver-bulk-write-%s", "first-write-failed"); + + dlClient.routingService.addHost(name, dlServer.getAddress()); + + final int writeCount = 100; + List<ByteBuffer> writes = new ArrayList<ByteBuffer>(writeCount + 1); + ByteBuffer buf = ByteBuffer.allocate(MAX_LOGRECORD_SIZE + 1); + writes.add(buf); + for (long i = 1; i <= writeCount; i++) { + writes.add(ByteBuffer.wrap(("" + i).getBytes())); + } + + List<Future<DLSN>> futures = dlClient.dlClient.writeBulk(name, writes); + validateFailedAsLogRecordTooLong(futures.get(0)); + FutureUtils.result(Futures.collect(futures.subList(1, writeCount + 1))); + } + + @Test(timeout = 60000) + public void testBulkWriteTotalFailureLostLock() throws Exception { + String name = String.format("dlserver-bulk-write-%s", "lost-lock"); + + dlClient.routingService.addHost(name, dlServer.getAddress()); + + final int writeCount = 8; + List<ByteBuffer> writes = new ArrayList<ByteBuffer>(writeCount + 1); + ByteBuffer buf = ByteBuffer.allocate(8); + writes.add(buf); + for (long i = 1; i <= writeCount; i++) { + writes.add(ByteBuffer.wrap(("" + i).getBytes())); + } + // Warm it up with a write. + Await.result(dlClient.dlClient.write(name, ByteBuffer.allocate(8))); + + // Failpoint a lost lock, make sure the failure gets promoted to an operation failure. + DistributedLogServiceImpl svcImpl = (DistributedLogServiceImpl) dlServer.dlServer.getLeft(); + try { + FailpointUtils.setFailpoint( + FailpointUtils.FailPointName.FP_WriteInternalLostLock, + FailpointUtils.FailPointActions.FailPointAction_Default + ); + Future<BulkWriteResponse> futures = svcImpl.writeBulkWithContext(name, writes, new WriteContext()); + assertEquals(StatusCode.LOCKING_EXCEPTION, Await.result(futures).header.code); + } finally { + FailpointUtils.removeFailpoint( + FailpointUtils.FailPointName.FP_WriteInternalLostLock + ); + } + } + + @Test(timeout = 60000) + public void testHeartbeat() throws Exception { + String name = "dlserver-heartbeat"; + + dlClient.routingService.addHost(name, dlServer.getAddress()); + + for (long i = 1; i <= 10; i++) { + logger.debug("Send heartbeat {} to stream {}.", i, name); + dlClient.dlClient.check(name).get(); + } + + logger.debug("Write entry one to stream {}.", name); + dlClient.dlClient.write(name, ByteBuffer.wrap("1".getBytes())).get(); + + Thread.sleep(1000); + + DistributedLogManager dlm = DLMTestUtil.createNewDLM(name, conf, getUri()); + LogReader reader = dlm.getInputStream(DLSN.InitialDLSN); + int numRead = 0; + // eid=0 => control records + // other 9 heartbeats will not trigger writing any control records. + // eid=1 => user entry + long startEntryId = 1; + LogRecordWithDLSN r = reader.readNext(false); + while (null != r) { + int i = Integer.parseInt(new String(r.getPayload())); + assertEquals(numRead + 1, i); + assertEquals(r.getDlsn().compareTo(new DLSN(1, startEntryId, 0)), 0); + ++numRead; + ++startEntryId; + r = reader.readNext(false); + } + assertEquals(1, numRead); + } + + @Test(timeout = 60000) + public void testFenceWrite() throws Exception { + String name = "dlserver-fence-write"; + + dlClient.routingService.addHost(name, dlServer.getAddress()); + + for (long i = 1; i <= 10; i++) { + logger.debug("Write entry {} to stream {}.", i, name); + dlClient.dlClient.write(name, ByteBuffer.wrap(("" + i).getBytes())).get(); + } + + Thread.sleep(1000); + + logger.info("Fencing stream {}.", name); + DLMTestUtil.fenceStream(conf, getUri(), name); + logger.info("Fenced stream {}.", name); + + for (long i = 11; i <= 20; i++) { + logger.debug("Write entry {} to stream {}.", i, name); + dlClient.dlClient.write(name, ByteBuffer.wrap(("" + i).getBytes())).get(); + } + + DistributedLogManager dlm = DLMTestUtil.createNewDLM(name, conf, getUri()); + LogReader reader = dlm.getInputStream(1); + int numRead = 0; + LogRecord r = reader.readNext(false); + while (null != r) { + int i = Integer.parseInt(new String(r.getPayload())); + assertEquals(numRead + 1, i); + ++numRead; + r = reader.readNext(false); + } + assertEquals(20, numRead); + reader.close(); + dlm.close(); + } + + @Test(timeout = 60000) + public void testDeleteStream() throws Exception { + String name = "dlserver-delete-stream"; + + dlClient.routingService.addHost(name, dlServer.getAddress()); + + long txid = 101; + for (long i = 1; i <= 10; i++) { + long curTxId = txid++; + logger.debug("Write entry {} to stream {}.", curTxId, name); + dlClient.dlClient.write(name, + ByteBuffer.wrap(("" + curTxId).getBytes())).get(); + } + + checkStream(1, 1, 1, name, dlServer.getAddress(), true, true); + + dlClient.dlClient.delete(name).get(); + + checkStream(0, 0, 0, name, dlServer.getAddress(), false, false); + + Thread.sleep(1000); + + DistributedLogManager dlm101 = DLMTestUtil.createNewDLM(name, conf, getUri()); + AsyncLogReader reader101 = FutureUtils.result(dlm101.openAsyncLogReader(DLSN.InitialDLSN)); + try { + FutureUtils.result(reader101.readNext()); + fail("Should fail with LogNotFoundException since the stream is deleted"); + } catch (LogNotFoundException lnfe) { + // expected + } + FutureUtils.result(reader101.asyncClose()); + dlm101.close(); + + txid = 201; + for (long i = 1; i <= 10; i++) { + long curTxId = txid++; + logger.debug("Write entry {} to stream {}.", curTxId, name); + DLSN dlsn = dlClient.dlClient.write(name, + ByteBuffer.wrap(("" + curTxId).getBytes())).get(); + } + Thread.sleep(1000); + + DistributedLogManager dlm201 = DLMTestUtil.createNewDLM(name, conf, getUri()); + LogReader reader201 = dlm201.getInputStream(1); + int numRead = 0; + int curTxId = 201; + LogRecord r = reader201.readNext(false); + while (null != r) { + int i = Integer.parseInt(new String(r.getPayload())); + assertEquals(curTxId++, i); + ++numRead; + r = reader201.readNext(false); + } + assertEquals(10, numRead); + reader201.close(); + dlm201.close(); + } + + @Test(timeout = 60000) + public void testCreateStream() throws Exception { + try { + setupNoAdHocCluster(); + final String name = "dlserver-create-stream"; + + noAdHocClient.routingService.addHost("dlserver-create-stream", noAdHocServer.getAddress()); + assertFalse(noAdHocServer.dlServer.getKey().getStreamManager().isAcquired(name)); + assertTrue(Await.ready(noAdHocClient.dlClient.create(name)).isReturn()); + + long txid = 101; + for (long i = 1; i <= 10; i++) { + long curTxId = txid++; + logger.debug("Write entry {} to stream {}.", curTxId, name); + noAdHocClient.dlClient.write(name, + ByteBuffer.wrap(("" + curTxId).getBytes())).get(); + } + + assertTrue(noAdHocServer.dlServer.getKey().getStreamManager().isAcquired(name)); + } finally { + tearDownNoAdHocCluster(); + } + } + + /** + * This tests that create has touch like behavior in that trying to create the stream twice, simply does nothing. + */ + @Test(timeout = 60000) + public void testCreateStreamTwice() throws Exception { + try { + setupNoAdHocCluster(); + final String name = "dlserver-create-stream-twice"; + + noAdHocClient.routingService.addHost("dlserver-create-stream-twice", noAdHocServer.getAddress()); + assertFalse(noAdHocServer.dlServer.getKey().getStreamManager().isAcquired(name)); + assertTrue(Await.ready(noAdHocClient.dlClient.create(name)).isReturn()); + + long txid = 101; + for (long i = 1; i <= 10; i++) { + long curTxId = txid++; + logger.debug("Write entry {} to stream {}.", curTxId, name); + noAdHocClient.dlClient.write(name, + ByteBuffer.wrap(("" + curTxId).getBytes())).get(); + } + + assertTrue(noAdHocServer.dlServer.getKey().getStreamManager().isAcquired(name)); + + // create again + assertTrue(Await.ready(noAdHocClient.dlClient.create(name)).isReturn()); + assertTrue(noAdHocServer.dlServer.getKey().getStreamManager().isAcquired(name)); + } finally { + tearDownNoAdHocCluster(); + } + } + + + + @Test(timeout = 60000) + public void testTruncateStream() throws Exception { + String name = "dlserver-truncate-stream"; + + dlClient.routingService.addHost(name, dlServer.getAddress()); + + long txid = 1; + Map<Long, DLSN> txid2DLSN = new HashMap<Long, DLSN>(); + for (int s = 1; s <= 2; s++) { + for (long i = 1; i <= 10; i++) { + long curTxId = txid++; + logger.debug("Write entry {} to stream {}.", curTxId, name); + DLSN dlsn = dlClient.dlClient.write(name, + ByteBuffer.wrap(("" + curTxId).getBytes())).get(); + txid2DLSN.put(curTxId, dlsn); + } + if (s == 1) { + dlClient.dlClient.release(name).get(); + } + } + + DLSN dlsnToDelete = txid2DLSN.get(11L); + dlClient.dlClient.truncate(name, dlsnToDelete).get(); + + DistributedLogManager readDLM = DLMTestUtil.createNewDLM(name, conf, getUri()); + LogReader reader = readDLM.getInputStream(1); + int numRead = 0; + int curTxId = 11; + LogRecord r = reader.readNext(false); + while (null != r) { + int i = Integer.parseInt(new String(r.getPayload())); + assertEquals(curTxId++, i); + ++numRead; + r = reader.readNext(false); + } + assertEquals(10, numRead); + reader.close(); + readDLM.close(); + } + + @Test(timeout = 60000) + public void testRequestDenied() throws Exception { + String name = "request-denied"; + + dlClient.routingService.addHost(name, dlServer.getAddress()); + + AccessControlEntry ace = new AccessControlEntry(); + ace.setDenyWrite(true); + ZooKeeperClient zkc = TestZooKeeperClientBuilder + .newBuilder() + .uri(getUri()) + .connectionTimeoutMs(60000) + .sessionTimeoutMs(60000) + .build(); + DistributedLogNamespace dlNamespace = dlServer.dlServer.getLeft().getDistributedLogNamespace(); + BKDLConfig bkdlConfig = BKDLConfig.resolveDLConfig(zkc, getUri()); + String zkPath = getUri().getPath() + "/" + bkdlConfig.getACLRootPath() + "/" + name; + ZKAccessControl accessControl = new ZKAccessControl(ace, zkPath); + accessControl.create(zkc); + + AccessControlManager acm = dlNamespace.createAccessControlManager(); + while (acm.allowWrite(name)) { + Thread.sleep(100); + } + + try { + Await.result(dlClient.dlClient.write(name, ByteBuffer.wrap("1".getBytes(UTF_8)))); + fail("Should fail with request denied exception"); + } catch (DLException dle) { + assertEquals(StatusCode.REQUEST_DENIED.getValue(), dle.getCode()); + } + } + + @Test(timeout = 60000) + public void testNoneStreamNameRegex() throws Exception { + String streamNamePrefix = "none-stream-name-regex-"; + int numStreams = 5; + Set<String> streams = new HashSet<String>(); + + for (int i = 0; i < numStreams; i++) { + streams.add(streamNamePrefix + i); + } + testStreamNameRegex(streams, ".*", streams); + } + + @Test(timeout = 60000) + public void testStreamNameRegex() throws Exception { + String streamNamePrefix = "stream-name-regex-"; + int numStreams = 5; + Set<String> streams = new HashSet<String>(); + Set<String> expectedStreams = new HashSet<String>(); + String streamNameRegex = streamNamePrefix + "1"; + + for (int i = 0; i < numStreams; i++) { + streams.add(streamNamePrefix + i); + } + expectedStreams.add(streamNamePrefix + "1"); + + testStreamNameRegex(streams, streamNameRegex, expectedStreams); + } + + private void testStreamNameRegex(Set<String> streams, String streamNameRegex, + Set<String> expectedStreams) + throws Exception { + for (String streamName : streams) { + dlClient.routingService.addHost(streamName, dlServer.getAddress()); + Await.result(dlClient.dlClient.write(streamName, + ByteBuffer.wrap(streamName.getBytes(UTF_8)))); + } + + DLClient client = createDistributedLogClient( + "test-stream-name-regex", + streamNameRegex, + Optional.<String>absent()); + try { + client.routingService.addHost("unknown", dlServer.getAddress()); + client.handshake(); + Map<SocketAddress, Set<String>> distribution = + client.dlClient.getStreamOwnershipDistribution(); + assertEquals(1, distribution.size()); + Set<String> cachedStreams = distribution.values().iterator().next(); + assertNotNull(cachedStreams); + assertEquals(expectedStreams.size(), cachedStreams.size()); + + for (String streamName : cachedStreams) { + assertTrue(expectedStreams.contains(streamName)); + } + } finally { + client.shutdown(); + } + } + + @Test(timeout = 60000) + public void testReleaseStream() throws Exception { + String name = "dlserver-release-stream"; + + dlClient.routingService.addHost(name, dlServer.getAddress()); + + Await.result(dlClient.dlClient.write(name, ByteBuffer.wrap("1".getBytes(UTF_8)))); + checkStream(1, 1, 1, name, dlServer.getAddress(), true, true); + + // release the stream + Await.result(dlClient.dlClient.release(name)); + checkStream(0, 0, 0, name, dlServer.getAddress(), false, false); + } + + protected void checkStream(int expectedNumProxiesInClient, int expectedClientCacheSize, int expectedServerCacheSize, + String name, SocketAddress owner, boolean existedInServer, boolean existedInClient) { + Map<SocketAddress, Set<String>> distribution = dlClient.dlClient.getStreamOwnershipDistribution(); + assertEquals(expectedNumProxiesInClient, distribution.size()); + + if (expectedNumProxiesInClient > 0) { + Map.Entry<SocketAddress, Set<String>> localEntry = + distribution.entrySet().iterator().next(); + assertEquals(owner, localEntry.getKey()); + assertEquals(expectedClientCacheSize, localEntry.getValue().size()); + assertEquals(existedInClient, localEntry.getValue().contains(name)); + } + + + StreamManagerImpl streamManager = (StreamManagerImpl) dlServer.dlServer.getKey().getStreamManager(); + Set<String> cachedStreams = streamManager.getCachedStreams().keySet(); + Set<String> acquiredStreams = streamManager.getCachedStreams().keySet(); + + assertEquals(expectedServerCacheSize, cachedStreams.size()); + assertEquals(existedInServer, cachedStreams.contains(name)); + assertEquals(expectedServerCacheSize, acquiredStreams.size()); + assertEquals(existedInServer, acquiredStreams.contains(name)); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/TestDistributedLogServerClientRouting.java ---------------------------------------------------------------------- diff --git a/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/TestDistributedLogServerClientRouting.java b/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/TestDistributedLogServerClientRouting.java new file mode 100644 index 0000000..c7ae960 --- /dev/null +++ b/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/TestDistributedLogServerClientRouting.java @@ -0,0 +1,58 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.distributedlog.service; + +import static com.google.common.base.Charsets.UTF_8; +import static org.junit.Assert.fail; + +import com.twitter.finagle.NoBrokersAvailableException; +import com.twitter.util.Await; +import java.nio.ByteBuffer; +import org.junit.Test; + +/** + * Test the server with client side routing. + */ +public class TestDistributedLogServerClientRouting extends TestDistributedLogServerBase { + + public TestDistributedLogServerClientRouting() { + super(true); + } + + @Test(timeout = 60000) + public void testAcceptNewStream() throws Exception { + String name = "dlserver-accept-new-stream"; + + dlClient.routingService.addHost(name, dlServer.getAddress()); + dlClient.routingService.setAllowRetrySameHost(false); + + Await.result(dlClient.dlClient.setAcceptNewStream(false)); + + try { + Await.result(dlClient.dlClient.write(name, ByteBuffer.wrap("1".getBytes(UTF_8)))); + fail("Should fail because the proxy couldn't accept new stream"); + } catch (NoBrokersAvailableException nbae) { + // expected + } + checkStream(0, 0, 0, name, dlServer.getAddress(), false, false); + + Await.result(dlServer.dlServer.getLeft().setAcceptNewStream(true)); + Await.result(dlClient.dlClient.write(name, ByteBuffer.wrap("1".getBytes(UTF_8)))); + checkStream(1, 1, 1, name, dlServer.getAddress(), true, true); + } +} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/TestDistributedLogServerServerRouting.java ---------------------------------------------------------------------- diff --git a/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/TestDistributedLogServerServerRouting.java b/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/TestDistributedLogServerServerRouting.java new file mode 100644 index 0000000..12416a3 --- /dev/null +++ b/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/TestDistributedLogServerServerRouting.java @@ -0,0 +1,28 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.distributedlog.service; + +/** + * Test the server with client side routing. + */ +public class TestDistributedLogServerServerRouting extends TestDistributedLogServerBase { + + public TestDistributedLogServerServerRouting() { + super(false); + } +} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/TestDistributedLogService.java ---------------------------------------------------------------------- diff --git a/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/TestDistributedLogService.java b/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/TestDistributedLogService.java new file mode 100644 index 0000000..4a2d65f --- /dev/null +++ b/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/TestDistributedLogService.java @@ -0,0 +1,833 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.distributedlog.service; + +import static com.google.common.base.Charsets.UTF_8; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import com.google.common.collect.Lists; +import org.apache.distributedlog.DLSN; +import org.apache.distributedlog.DistributedLogConfiguration; +import org.apache.distributedlog.TestDistributedLogBase; +import org.apache.distributedlog.acl.DefaultAccessControlManager; +import org.apache.distributedlog.client.routing.LocalRoutingService; +import org.apache.distributedlog.exceptions.OwnershipAcquireFailedException; +import org.apache.distributedlog.exceptions.StreamUnavailableException; +import org.apache.distributedlog.protocol.util.ProtocolUtils; +import org.apache.distributedlog.service.config.NullStreamConfigProvider; +import org.apache.distributedlog.service.config.ServerConfiguration; +import org.apache.distributedlog.service.placement.EqualLoadAppraiser; +import org.apache.distributedlog.service.stream.Stream; +import org.apache.distributedlog.service.stream.StreamImpl; +import org.apache.distributedlog.service.stream.StreamImpl.StreamStatus; +import org.apache.distributedlog.service.stream.StreamManagerImpl; +import org.apache.distributedlog.service.stream.WriteOp; +import org.apache.distributedlog.service.streamset.DelimiterStreamPartitionConverter; +import org.apache.distributedlog.service.streamset.IdentityStreamPartitionConverter; +import org.apache.distributedlog.service.streamset.StreamPartitionConverter; +import org.apache.distributedlog.thrift.service.HeartbeatOptions; +import org.apache.distributedlog.thrift.service.StatusCode; +import org.apache.distributedlog.thrift.service.WriteContext; +import org.apache.distributedlog.thrift.service.WriteResponse; +import org.apache.distributedlog.util.ConfUtils; +import org.apache.distributedlog.util.FutureUtils; +import com.twitter.util.Await; +import com.twitter.util.Future; +import java.net.URI; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import org.apache.bookkeeper.feature.SettableFeature; +import org.apache.bookkeeper.stats.NullStatsLogger; +import org.apache.bookkeeper.util.ReflectionUtils; +import org.apache.commons.configuration.ConfigurationException; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TestName; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Test Case for DistributedLog Service. + */ +public class TestDistributedLogService extends TestDistributedLogBase { + + private static final Logger logger = LoggerFactory.getLogger(TestDistributedLogService.class); + + @Rule + public TestName testName = new TestName(); + + private ServerConfiguration serverConf; + private DistributedLogConfiguration dlConf; + private URI uri; + private final CountDownLatch latch = new CountDownLatch(1); + private DistributedLogServiceImpl service; + + @Before + @Override + public void setup() throws Exception { + super.setup(); + dlConf = new DistributedLogConfiguration(); + dlConf.addConfiguration(conf); + dlConf.setLockTimeout(0) + .setOutputBufferSize(0) + .setPeriodicFlushFrequencyMilliSeconds(10) + .setSchedulerShutdownTimeoutMs(100); + serverConf = newLocalServerConf(); + uri = createDLMURI("/" + testName.getMethodName()); + ensureURICreated(uri); + service = createService(serverConf, dlConf, latch); + } + + @After + @Override + public void teardown() throws Exception { + if (null != service) { + service.shutdown(); + } + super.teardown(); + } + + private DistributedLogConfiguration newLocalConf() { + DistributedLogConfiguration confLocal = new DistributedLogConfiguration(); + confLocal.addConfiguration(dlConf); + return confLocal; + } + + private ServerConfiguration newLocalServerConf() { + ServerConfiguration serverConf = new ServerConfiguration(); + serverConf.loadConf(dlConf); + serverConf.setServerThreads(1); + return serverConf; + } + + private DistributedLogServiceImpl createService( + ServerConfiguration serverConf, + DistributedLogConfiguration dlConf) throws Exception { + return createService(serverConf, dlConf, new CountDownLatch(1)); + } + + private DistributedLogServiceImpl createService( + ServerConfiguration serverConf, + DistributedLogConfiguration dlConf, + CountDownLatch latch) throws Exception { + // Build the stream partition converter + StreamPartitionConverter converter; + try { + converter = ReflectionUtils.newInstance(serverConf.getStreamPartitionConverterClass()); + } catch (ConfigurationException e) { + logger.warn("Failed to load configured stream-to-partition converter. Fallback to use {}", + IdentityStreamPartitionConverter.class.getName()); + converter = new IdentityStreamPartitionConverter(); + } + return new DistributedLogServiceImpl( + serverConf, + dlConf, + ConfUtils.getConstDynConf(dlConf), + new NullStreamConfigProvider(), + uri, + converter, + new LocalRoutingService(), + NullStatsLogger.INSTANCE, + NullStatsLogger.INSTANCE, + latch, + new EqualLoadAppraiser()); + } + + private StreamImpl createUnstartedStream(DistributedLogServiceImpl service, + String name) throws Exception { + StreamImpl stream = (StreamImpl) service.newStream(name); + stream.initialize(); + return stream; + } + + private ByteBuffer createRecord(long txid) { + return ByteBuffer.wrap(("record-" + txid).getBytes(UTF_8)); + } + + private WriteOp createWriteOp(DistributedLogServiceImpl service, + String streamName, + long txid) { + ByteBuffer data = createRecord(txid); + return service.newWriteOp(streamName, data, null); + } + + @Test(timeout = 60000) + public void testAcquireStreams() throws Exception { + String streamName = testName.getMethodName(); + StreamImpl s0 = createUnstartedStream(service, streamName); + ServerConfiguration serverConf1 = new ServerConfiguration(); + serverConf1.addConfiguration(serverConf); + serverConf1.setServerPort(9999); + DistributedLogServiceImpl service1 = createService(serverConf1, dlConf); + StreamImpl s1 = createUnstartedStream(service1, streamName); + + // create write ops + WriteOp op0 = createWriteOp(service, streamName, 0L); + s0.submit(op0); + + WriteOp op1 = createWriteOp(service1, streamName, 1L); + s1.submit(op1); + + // check pending size + assertEquals("Write Op 0 should be pending in service 0", + 1, s0.numPendingOps()); + assertEquals("Write Op 1 should be pending in service 1", + 1, s1.numPendingOps()); + + // start acquiring s0 + s0.start(); + WriteResponse wr0 = Await.result(op0.result()); + assertEquals("Op 0 should succeed", + StatusCode.SUCCESS, wr0.getHeader().getCode()); + assertEquals("Service 0 should acquire stream", + StreamStatus.INITIALIZED, s0.getStatus()); + assertNotNull(s0.getManager()); + assertNotNull(s0.getWriter()); + assertNull(s0.getLastException()); + + // start acquiring s1 + s1.start(); + WriteResponse wr1 = Await.result(op1.result()); + assertEquals("Op 1 should fail", + StatusCode.FOUND, wr1.getHeader().getCode()); + // the stream will be set to ERROR and then be closed. + assertTrue("Service 1 should be in unavailable state", + StreamStatus.isUnavailable(s1.getStatus())); + assertNotNull(s1.getManager()); + assertNull(s1.getWriter()); + assertNotNull(s1.getLastException()); + assertTrue(s1.getLastException() instanceof OwnershipAcquireFailedException); + + service1.shutdown(); + } + + @Test(timeout = 60000) + public void testAcquireStreamsWhenExceedMaxCachedPartitions() throws Exception { + String streamName = testName.getMethodName() + "_0000"; + + DistributedLogConfiguration confLocal = new DistributedLogConfiguration(); + confLocal.addConfiguration(dlConf); + confLocal.setMaxCachedPartitionsPerProxy(1); + + ServerConfiguration serverConfLocal = new ServerConfiguration(); + serverConfLocal.addConfiguration(serverConf); + serverConfLocal.setStreamPartitionConverterClass(DelimiterStreamPartitionConverter.class); + + DistributedLogServiceImpl serviceLocal = createService(serverConfLocal, confLocal); + Stream stream = serviceLocal.getLogWriter(streamName); + + // stream is cached + assertNotNull(stream); + assertEquals(1, serviceLocal.getStreamManager().numCached()); + + // create write ops + WriteOp op0 = createWriteOp(service, streamName, 0L); + stream.submit(op0); + WriteResponse wr0 = Await.result(op0.result()); + assertEquals("Op 0 should succeed", + StatusCode.SUCCESS, wr0.getHeader().getCode()); + assertEquals(1, serviceLocal.getStreamManager().numAcquired()); + + // should fail to acquire another partition + try { + serviceLocal.getLogWriter(testName.getMethodName() + "_0001"); + fail("Should fail to acquire new streams"); + } catch (StreamUnavailableException sue) { + // expected + } + assertEquals(1, serviceLocal.getStreamManager().numCached()); + assertEquals(1, serviceLocal.getStreamManager().numAcquired()); + + // should be able to acquire partitions from other streams + String anotherStreamName = testName.getMethodName() + "-another_0001"; + Stream anotherStream = serviceLocal.getLogWriter(anotherStreamName); + assertNotNull(anotherStream); + assertEquals(2, serviceLocal.getStreamManager().numCached()); + + // create write ops + WriteOp op1 = createWriteOp(service, anotherStreamName, 0L); + anotherStream.submit(op1); + WriteResponse wr1 = Await.result(op1.result()); + assertEquals("Op 1 should succeed", + StatusCode.SUCCESS, wr1.getHeader().getCode()); + assertEquals(2, serviceLocal.getStreamManager().numAcquired()); + } + + @Test(timeout = 60000) + public void testAcquireStreamsWhenExceedMaxAcquiredPartitions() throws Exception { + String streamName = testName.getMethodName() + "_0000"; + + DistributedLogConfiguration confLocal = new DistributedLogConfiguration(); + confLocal.addConfiguration(dlConf); + confLocal.setMaxCachedPartitionsPerProxy(-1); + confLocal.setMaxAcquiredPartitionsPerProxy(1); + + ServerConfiguration serverConfLocal = new ServerConfiguration(); + serverConfLocal.addConfiguration(serverConf); + serverConfLocal.setStreamPartitionConverterClass(DelimiterStreamPartitionConverter.class); + + DistributedLogServiceImpl serviceLocal = createService(serverConfLocal, confLocal); + Stream stream = serviceLocal.getLogWriter(streamName); + + // stream is cached + assertNotNull(stream); + assertEquals(1, serviceLocal.getStreamManager().numCached()); + + // create write ops + WriteOp op0 = createWriteOp(service, streamName, 0L); + stream.submit(op0); + WriteResponse wr0 = Await.result(op0.result()); + assertEquals("Op 0 should succeed", + StatusCode.SUCCESS, wr0.getHeader().getCode()); + assertEquals(1, serviceLocal.getStreamManager().numAcquired()); + + // should be able to cache partitions from same stream + String anotherStreamName = testName.getMethodName() + "_0001"; + Stream anotherStream = serviceLocal.getLogWriter(anotherStreamName); + assertNotNull(anotherStream); + assertEquals(2, serviceLocal.getStreamManager().numCached()); + + // create write ops + WriteOp op1 = createWriteOp(service, anotherStreamName, 0L); + anotherStream.submit(op1); + WriteResponse wr1 = Await.result(op1.result()); + assertEquals("Op 1 should fail", + StatusCode.STREAM_UNAVAILABLE, wr1.getHeader().getCode()); + assertEquals(1, serviceLocal.getStreamManager().numAcquired()); + } + + @Test(timeout = 60000) + public void testCloseShouldErrorOutPendingOps() throws Exception { + String streamName = testName.getMethodName(); + StreamImpl s = createUnstartedStream(service, streamName); + + int numWrites = 10; + List<Future<WriteResponse>> futureList = new ArrayList<Future<WriteResponse>>(numWrites); + for (int i = 0; i < numWrites; i++) { + WriteOp op = createWriteOp(service, streamName, i); + s.submit(op); + futureList.add(op.result()); + } + assertEquals(numWrites, s.numPendingOps()); + Await.result(s.requestClose("close stream")); + assertEquals("Stream " + streamName + " is set to " + StreamStatus.CLOSED, + StreamStatus.CLOSED, s.getStatus()); + for (int i = 0; i < numWrites; i++) { + Future<WriteResponse> future = futureList.get(i); + WriteResponse wr = Await.result(future); + assertEquals("Pending op should fail with " + StatusCode.STREAM_UNAVAILABLE, + StatusCode.STREAM_UNAVAILABLE, wr.getHeader().getCode()); + } + } + + @Test(timeout = 60000) + public void testCloseTwice() throws Exception { + String streamName = testName.getMethodName(); + StreamImpl s = createUnstartedStream(service, streamName); + + int numWrites = 10; + List<Future<WriteResponse>> futureList = new ArrayList<Future<WriteResponse>>(numWrites); + for (int i = 0; i < numWrites; i++) { + WriteOp op = createWriteOp(service, streamName, i); + s.submit(op); + futureList.add(op.result()); + } + assertEquals(numWrites, s.numPendingOps()); + + Future<Void> closeFuture0 = s.requestClose("close 0"); + assertTrue("Stream " + streamName + " should be set to " + StreamStatus.CLOSING, + StreamStatus.CLOSING == s.getStatus() + || StreamStatus.CLOSED == s.getStatus()); + Future<Void> closeFuture1 = s.requestClose("close 1"); + assertTrue("Stream " + streamName + " should be set to " + StreamStatus.CLOSING, + StreamStatus.CLOSING == s.getStatus() + || StreamStatus.CLOSED == s.getStatus()); + + Await.result(closeFuture0); + assertEquals("Stream " + streamName + " should be set to " + StreamStatus.CLOSED, + StreamStatus.CLOSED, s.getStatus()); + Await.result(closeFuture1); + assertEquals("Stream " + streamName + " should be set to " + StreamStatus.CLOSED, + StreamStatus.CLOSED, s.getStatus()); + + for (int i = 0; i < numWrites; i++) { + Future<WriteResponse> future = futureList.get(i); + WriteResponse wr = Await.result(future); + assertEquals("Pending op should fail with " + StatusCode.STREAM_UNAVAILABLE, + StatusCode.STREAM_UNAVAILABLE, wr.getHeader().getCode()); + } + } + + @Test(timeout = 60000) + public void testFailRequestsDuringClosing() throws Exception { + String streamName = testName.getMethodName(); + StreamImpl s = createUnstartedStream(service, streamName); + + Future<Void> closeFuture = s.requestClose("close"); + assertTrue("Stream " + streamName + " should be set to " + StreamStatus.CLOSING, + StreamStatus.CLOSING == s.getStatus() + || StreamStatus.CLOSED == s.getStatus()); + WriteOp op1 = createWriteOp(service, streamName, 0L); + s.submit(op1); + WriteResponse response1 = Await.result(op1.result()); + assertEquals("Op should fail with " + StatusCode.STREAM_UNAVAILABLE + " if it is closing", + StatusCode.STREAM_UNAVAILABLE, response1.getHeader().getCode()); + + Await.result(closeFuture); + assertEquals("Stream " + streamName + " should be set to " + StreamStatus.CLOSED, + StreamStatus.CLOSED, s.getStatus()); + WriteOp op2 = createWriteOp(service, streamName, 1L); + s.submit(op2); + WriteResponse response2 = Await.result(op2.result()); + assertEquals("Op should fail with " + StatusCode.STREAM_UNAVAILABLE + " if it is closed", + StatusCode.STREAM_UNAVAILABLE, response2.getHeader().getCode()); + } + + @Test(timeout = 60000) + public void testServiceTimeout() throws Exception { + DistributedLogConfiguration confLocal = newLocalConf(); + confLocal.setOutputBufferSize(Integer.MAX_VALUE) + .setImmediateFlushEnabled(false) + .setPeriodicFlushFrequencyMilliSeconds(0); + ServerConfiguration serverConfLocal = newLocalServerConf(); + serverConfLocal.addConfiguration(serverConf); + serverConfLocal.setServiceTimeoutMs(200) + .setStreamProbationTimeoutMs(100); + String streamName = testName.getMethodName(); + // create a new service with 200ms timeout + DistributedLogServiceImpl localService = createService(serverConfLocal, confLocal); + StreamManagerImpl streamManager = (StreamManagerImpl) localService.getStreamManager(); + + int numWrites = 10; + List<Future<WriteResponse>> futureList = new ArrayList<Future<WriteResponse>>(numWrites); + for (int i = 0; i < numWrites; i++) { + futureList.add(localService.write(streamName, createRecord(i))); + } + + assertTrue("Stream " + streamName + " should be cached", + streamManager.getCachedStreams().containsKey(streamName)); + + StreamImpl s = (StreamImpl) streamManager.getCachedStreams().get(streamName); + // the stream should be set CLOSING + while (StreamStatus.CLOSING != s.getStatus() + && StreamStatus.CLOSED != s.getStatus()) { + TimeUnit.MILLISECONDS.sleep(20); + } + assertNotNull("Writer should be initialized", s.getWriter()); + assertNull("No exception should be thrown", s.getLastException()); + Future<Void> closeFuture = s.getCloseFuture(); + Await.result(closeFuture); + for (int i = 0; i < numWrites; i++) { + assertTrue("Write should not fail before closing", + futureList.get(i).isDefined()); + WriteResponse response = Await.result(futureList.get(i)); + assertTrue("Op should fail with " + StatusCode.WRITE_CANCELLED_EXCEPTION, + StatusCode.BK_TRANSMIT_ERROR == response.getHeader().getCode() + || StatusCode.WRITE_EXCEPTION == response.getHeader().getCode() + || StatusCode.WRITE_CANCELLED_EXCEPTION == response.getHeader().getCode()); + } + + while (streamManager.getCachedStreams().containsKey(streamName)) { + TimeUnit.MILLISECONDS.sleep(20); + } + + assertFalse("Stream should be removed from cache", + streamManager.getCachedStreams().containsKey(streamName)); + assertFalse("Stream should be removed from acquired cache", + streamManager.getAcquiredStreams().containsKey(streamName)); + + localService.shutdown(); + } + + private DistributedLogServiceImpl createConfiguredLocalService() throws Exception { + DistributedLogConfiguration confLocal = newLocalConf(); + confLocal.setOutputBufferSize(0) + .setImmediateFlushEnabled(true) + .setPeriodicFlushFrequencyMilliSeconds(0); + return createService(serverConf, confLocal); + } + + private ByteBuffer getTestDataBuffer() { + return ByteBuffer.wrap("test-data".getBytes()); + } + + @Test(timeout = 60000) + public void testNonDurableWrite() throws Exception { + DistributedLogConfiguration confLocal = newLocalConf(); + confLocal.setOutputBufferSize(Integer.MAX_VALUE) + .setImmediateFlushEnabled(false) + .setPeriodicFlushFrequencyMilliSeconds(0) + .setDurableWriteEnabled(false); + ServerConfiguration serverConfLocal = new ServerConfiguration(); + serverConfLocal.addConfiguration(serverConf); + serverConfLocal.enableDurableWrite(false); + serverConfLocal.setServiceTimeoutMs(Integer.MAX_VALUE) + .setStreamProbationTimeoutMs(Integer.MAX_VALUE); + String streamName = testName.getMethodName(); + DistributedLogServiceImpl localService = + createService(serverConfLocal, confLocal); + StreamManagerImpl streamManager = (StreamManagerImpl) localService.getStreamManager(); + + int numWrites = 10; + List<Future<WriteResponse>> futureList = new ArrayList<Future<WriteResponse>>(); + for (int i = 0; i < numWrites; i++) { + futureList.add(localService.write(streamName, createRecord(i))); + } + assertTrue("Stream " + streamName + " should be cached", + streamManager.getCachedStreams().containsKey(streamName)); + List<WriteResponse> resultList = FutureUtils.result(Future.collect(futureList)); + for (WriteResponse wr : resultList) { + assertEquals(DLSN.InvalidDLSN, DLSN.deserialize(wr.getDlsn())); + } + + localService.shutdown(); + } + + @Test(timeout = 60000) + public void testWriteOpNoChecksum() throws Exception { + DistributedLogServiceImpl localService = createConfiguredLocalService(); + WriteContext ctx = new WriteContext(); + Future<WriteResponse> result = localService.writeWithContext("test", getTestDataBuffer(), ctx); + WriteResponse resp = Await.result(result); + assertEquals(StatusCode.SUCCESS, resp.getHeader().getCode()); + localService.shutdown(); + } + + @Test(timeout = 60000) + public void testTruncateOpNoChecksum() throws Exception { + DistributedLogServiceImpl localService = createConfiguredLocalService(); + WriteContext ctx = new WriteContext(); + Future<WriteResponse> result = localService.truncate("test", new DLSN(1, 2, 3).serialize(), ctx); + WriteResponse resp = Await.result(result); + assertEquals(StatusCode.SUCCESS, resp.getHeader().getCode()); + localService.shutdown(); + } + + @Test(timeout = 60000) + public void testStreamOpNoChecksum() throws Exception { + DistributedLogServiceImpl localService = createConfiguredLocalService(); + WriteContext ctx = new WriteContext(); + HeartbeatOptions option = new HeartbeatOptions(); + option.setSendHeartBeatToReader(true); + + // hearbeat to acquire the stream and then release the stream + Future<WriteResponse> result = localService.heartbeatWithOptions("test", ctx, option); + WriteResponse resp = Await.result(result); + assertEquals(StatusCode.SUCCESS, resp.getHeader().getCode()); + result = localService.release("test", ctx); + resp = Await.result(result); + assertEquals(StatusCode.SUCCESS, resp.getHeader().getCode()); + + // heartbeat to acquire the stream and then delete the stream + result = localService.heartbeatWithOptions("test", ctx, option); + resp = Await.result(result); + assertEquals(StatusCode.SUCCESS, resp.getHeader().getCode()); + result = localService.delete("test", ctx); + resp = Await.result(result); + assertEquals(StatusCode.SUCCESS, resp.getHeader().getCode()); + + // shutdown the local service + localService.shutdown(); + } + + @Test(timeout = 60000) + public void testWriteOpChecksumBadChecksum() throws Exception { + DistributedLogServiceImpl localService = createConfiguredLocalService(); + WriteContext ctx = new WriteContext().setCrc32(999); + Future<WriteResponse> result = localService.writeWithContext("test", getTestDataBuffer(), ctx); + WriteResponse resp = Await.result(result); + assertEquals(StatusCode.CHECKSUM_FAILED, resp.getHeader().getCode()); + localService.shutdown(); + } + + @Test(timeout = 60000) + public void testWriteOpChecksumBadStream() throws Exception { + DistributedLogServiceImpl localService = createConfiguredLocalService(); + WriteContext ctx = new WriteContext().setCrc32( + ProtocolUtils.writeOpCRC32("test", getTestDataBuffer().array())); + Future<WriteResponse> result = localService.writeWithContext("test1", getTestDataBuffer(), ctx); + WriteResponse resp = Await.result(result); + assertEquals(StatusCode.CHECKSUM_FAILED, resp.getHeader().getCode()); + localService.shutdown(); + } + + @Test(timeout = 60000) + public void testWriteOpChecksumBadData() throws Exception { + DistributedLogServiceImpl localService = createConfiguredLocalService(); + ByteBuffer buffer = getTestDataBuffer(); + WriteContext ctx = new WriteContext().setCrc32( + ProtocolUtils.writeOpCRC32("test", buffer.array())); + + // Overwrite 1 byte to corrupt data. + buffer.put(1, (byte) 0xab); + Future<WriteResponse> result = localService.writeWithContext("test", buffer, ctx); + WriteResponse resp = Await.result(result); + assertEquals(StatusCode.CHECKSUM_FAILED, resp.getHeader().getCode()); + localService.shutdown(); + } + + @Test(timeout = 60000) + public void testStreamOpChecksumBadChecksum() throws Exception { + DistributedLogServiceImpl localService = createConfiguredLocalService(); + WriteContext ctx = new WriteContext().setCrc32(999); + Future<WriteResponse> result = localService.heartbeat("test", ctx); + WriteResponse resp = Await.result(result); + assertEquals(StatusCode.CHECKSUM_FAILED, resp.getHeader().getCode()); + result = localService.release("test", ctx); + resp = Await.result(result); + assertEquals(StatusCode.CHECKSUM_FAILED, resp.getHeader().getCode()); + result = localService.delete("test", ctx); + resp = Await.result(result); + assertEquals(StatusCode.CHECKSUM_FAILED, resp.getHeader().getCode()); + localService.shutdown(); + } + + @Test(timeout = 60000) + public void testTruncateOpChecksumBadChecksum() throws Exception { + DistributedLogServiceImpl localService = createConfiguredLocalService(); + WriteContext ctx = new WriteContext().setCrc32(999); + Future<WriteResponse> result = localService.truncate("test", new DLSN(1, 2, 3).serialize(), ctx); + WriteResponse resp = Await.result(result); + assertEquals(StatusCode.CHECKSUM_FAILED, resp.getHeader().getCode()); + localService.shutdown(); + } + + private WriteOp getWriteOp(String name, SettableFeature disabledFeature, Long checksum) { + return new WriteOp(name, + ByteBuffer.wrap("test".getBytes()), + new NullStatsLogger(), + new NullStatsLogger(), + new IdentityStreamPartitionConverter(), + new ServerConfiguration(), + (byte) 0, + checksum, + false, + disabledFeature, + DefaultAccessControlManager.INSTANCE); + } + + @Test(timeout = 60000) + public void testStreamOpBadChecksumWithChecksumDisabled() throws Exception { + String streamName = testName.getMethodName(); + + SettableFeature disabledFeature = new SettableFeature("", 0); + + WriteOp writeOp0 = getWriteOp(streamName, disabledFeature, 919191L); + WriteOp writeOp1 = getWriteOp(streamName, disabledFeature, 919191L); + + try { + writeOp0.preExecute(); + fail("should have thrown"); + } catch (Exception ex) { + } + + disabledFeature.set(1); + writeOp1.preExecute(); + } + + @Test(timeout = 60000) + public void testStreamOpGoodChecksumWithChecksumDisabled() throws Exception { + String streamName = testName.getMethodName(); + + SettableFeature disabledFeature = new SettableFeature("", 1); + WriteOp writeOp0 = getWriteOp( + streamName, + disabledFeature, + ProtocolUtils.writeOpCRC32(streamName, "test".getBytes())); + WriteOp writeOp1 = getWriteOp( + streamName, + disabledFeature, + ProtocolUtils.writeOpCRC32(streamName, "test".getBytes())); + + writeOp0.preExecute(); + disabledFeature.set(0); + writeOp1.preExecute(); + } + + @Test(timeout = 60000) + public void testCloseStreamsShouldFlush() throws Exception { + DistributedLogConfiguration confLocal = newLocalConf(); + confLocal.setOutputBufferSize(Integer.MAX_VALUE) + .setImmediateFlushEnabled(false) + .setPeriodicFlushFrequencyMilliSeconds(0); + + String streamNamePrefix = testName.getMethodName(); + DistributedLogServiceImpl localService = createService(serverConf, confLocal); + StreamManagerImpl streamManager = (StreamManagerImpl) localService.getStreamManager(); + + int numStreams = 10; + int numWrites = 10; + List<Future<WriteResponse>> futureList = + Lists.newArrayListWithExpectedSize(numStreams * numWrites); + for (int i = 0; i < numStreams; i++) { + String streamName = streamNamePrefix + "-" + i; + HeartbeatOptions hbOptions = new HeartbeatOptions(); + hbOptions.setSendHeartBeatToReader(true); + // make sure the first log segment of each stream created + FutureUtils.result(localService.heartbeatWithOptions(streamName, new WriteContext(), hbOptions)); + for (int j = 0; j < numWrites; j++) { + futureList.add(localService.write(streamName, createRecord(i * numWrites + j))); + } + } + + assertEquals("There should be " + numStreams + " streams in cache", + numStreams, streamManager.getCachedStreams().size()); + while (streamManager.getAcquiredStreams().size() < numStreams) { + TimeUnit.MILLISECONDS.sleep(20); + } + + Future<List<Void>> closeResult = localService.closeStreams(); + List<Void> closedStreams = Await.result(closeResult); + assertEquals("There should be " + numStreams + " streams closed", + numStreams, closedStreams.size()); + // all writes should be flushed + for (Future<WriteResponse> future : futureList) { + WriteResponse response = Await.result(future); + assertTrue("Op should succeed or be rejected : " + response.getHeader().getCode(), + StatusCode.SUCCESS == response.getHeader().getCode() + || StatusCode.WRITE_EXCEPTION == response.getHeader().getCode() + || StatusCode.STREAM_UNAVAILABLE == response.getHeader().getCode()); + } + assertTrue("There should be no streams in the cache", + streamManager.getCachedStreams().isEmpty()); + assertTrue("There should be no streams in the acquired cache", + streamManager.getAcquiredStreams().isEmpty()); + + localService.shutdown(); + } + + @Test(timeout = 60000) + public void testCloseStreamsShouldAbort() throws Exception { + DistributedLogConfiguration confLocal = newLocalConf(); + confLocal.setOutputBufferSize(Integer.MAX_VALUE) + .setImmediateFlushEnabled(false) + .setPeriodicFlushFrequencyMilliSeconds(0); + + String streamNamePrefix = testName.getMethodName(); + DistributedLogServiceImpl localService = createService(serverConf, confLocal); + StreamManagerImpl streamManager = (StreamManagerImpl) localService.getStreamManager(); + + int numStreams = 10; + int numWrites = 10; + List<Future<WriteResponse>> futureList = + Lists.newArrayListWithExpectedSize(numStreams * numWrites); + for (int i = 0; i < numStreams; i++) { + String streamName = streamNamePrefix + "-" + i; + HeartbeatOptions hbOptions = new HeartbeatOptions(); + hbOptions.setSendHeartBeatToReader(true); + // make sure the first log segment of each stream created + FutureUtils.result(localService.heartbeatWithOptions(streamName, new WriteContext(), hbOptions)); + for (int j = 0; j < numWrites; j++) { + futureList.add(localService.write(streamName, createRecord(i * numWrites + j))); + } + } + + assertEquals("There should be " + numStreams + " streams in cache", + numStreams, streamManager.getCachedStreams().size()); + while (streamManager.getAcquiredStreams().size() < numStreams) { + TimeUnit.MILLISECONDS.sleep(20); + } + + for (Stream s : streamManager.getAcquiredStreams().values()) { + StreamImpl stream = (StreamImpl) s; + stream.setStatus(StreamStatus.ERROR); + } + + Future<List<Void>> closeResult = localService.closeStreams(); + List<Void> closedStreams = Await.result(closeResult); + assertEquals("There should be " + numStreams + " streams closed", + numStreams, closedStreams.size()); + // all writes should be flushed + for (Future<WriteResponse> future : futureList) { + WriteResponse response = Await.result(future); + assertTrue("Op should fail with " + StatusCode.BK_TRANSMIT_ERROR + " or be rejected : " + + response.getHeader().getCode(), + StatusCode.BK_TRANSMIT_ERROR == response.getHeader().getCode() + || StatusCode.WRITE_EXCEPTION == response.getHeader().getCode() + || StatusCode.WRITE_CANCELLED_EXCEPTION == response.getHeader().getCode()); + } + // acquired streams should all been removed after we close them + assertTrue("There should be no streams in the acquired cache", + streamManager.getAcquiredStreams().isEmpty()); + localService.shutdown(); + // cached streams wouldn't be removed immediately after streams are closed + // but they should be removed after we shutdown the service + assertTrue("There should be no streams in the cache after shutting down the service", + streamManager.getCachedStreams().isEmpty()); + } + + @Test(timeout = 60000) + public void testShutdown() throws Exception { + service.shutdown(); + StreamManagerImpl streamManager = (StreamManagerImpl) service.getStreamManager(); + WriteResponse response = + Await.result(service.write(testName.getMethodName(), createRecord(0L))); + assertEquals("Write should fail with " + StatusCode.SERVICE_UNAVAILABLE, + StatusCode.SERVICE_UNAVAILABLE, response.getHeader().getCode()); + assertTrue("There should be no streams created after shutdown", + streamManager.getCachedStreams().isEmpty()); + assertTrue("There should be no streams acquired after shutdown", + streamManager.getAcquiredStreams().isEmpty()); + } + + @Test(timeout = 60000) + public void testGetOwner() throws Exception { + ((LocalRoutingService) service.getRoutingService()) + .addHost("stream-0", service.getServiceAddress().getSocketAddress()) + .setAllowRetrySameHost(false); + + service.startPlacementPolicy(); + + WriteResponse response = FutureUtils.result(service.getOwner("stream-1", new WriteContext())); + assertEquals(StatusCode.FOUND, response.getHeader().getCode()); + assertEquals(service.getServiceAddress().toString(), + response.getHeader().getLocation()); + + // service cache "stream-2" + StreamImpl stream = (StreamImpl) service.getStreamManager().getOrCreateStream("stream-2", false); + // create write ops to stream-2 to make service acquire the stream + WriteOp op = createWriteOp(service, "stream-2", 0L); + stream.submit(op); + stream.start(); + WriteResponse wr = Await.result(op.result()); + assertEquals("Op should succeed", + StatusCode.SUCCESS, wr.getHeader().getCode()); + assertEquals("Service should acquire stream", + StreamStatus.INITIALIZED, stream.getStatus()); + assertNotNull(stream.getManager()); + assertNotNull(stream.getWriter()); + assertNull(stream.getLastException()); + + // the stream is acquired + response = FutureUtils.result(service.getOwner("stream-2", new WriteContext())); + assertEquals(StatusCode.FOUND, response.getHeader().getCode()); + assertEquals(service.getServiceAddress().toString(), + response.getHeader().getLocation()); + } + +}