ArvinDevel commented on issue #203: Get last Record can't finish after write succeed URL: https://github.com/apache/distributedlog/issues/203#issuecomment-335670140 `package org.apache.distributedlog; import org.apache.distributedlog.api.AsyncLogReader; import org.apache.distributedlog.api.AsyncLogWriter; import org.apache.distributedlog.api.DistributedLogManager; import org.apache.distributedlog.api.namespace.Namespace; import org.apache.distributedlog.api.namespace.NamespaceBuilder; import org.apache.distributedlog.common.concurrent.FutureEventListener; import org.junit.Test; import java.net.URI; import java.util.Optional; import java.util.concurrent.CountDownLatch; /** * TestReadLast. */ public class TestReadLast extends TestDistributedLogBase{ Namespace dlNamespace = null; DistributedLogManager dlm = null; AsyncLogWriter logWriter = null; AsyncLogReader logReader = null; AsyncLogReader logReader2 = null; DLSN first = null; DLSN second = null; @Test public void testReadLast() throws Exception { String name = "testReadLast"; DistributedLogConfiguration dlconfig = new DistributedLogConfiguration(); URI namespaceUri = null; CountDownLatch openLatch = new CountDownLatch(1); CountDownLatch writeLatch = new CountDownLatch(1); CountDownLatch writeLatch2 = new CountDownLatch(1); CountDownLatch openReaderLatch = new CountDownLatch(1); CountDownLatch readLatch = new CountDownLatch(1); CountDownLatch openReaderLatch2 = new CountDownLatch(1); CountDownLatch readLatch2 = new CountDownLatch(1); try { namespaceUri = createDLMURI("/default_namespace"); ensureURICreated(namespaceUri); LOG.info("created DLM URI {} succeed ", namespaceUri.toString()); } catch (Exception ioe){ LOG.info("create DLM URI error {}", ioe.toString()); } //initialize dl namespace //set dlog transmit outputBuffer size to 0, entry will have only one record. dlconfig.setOutputBufferSize(0); try { dlNamespace = NamespaceBuilder.newBuilder() .conf(dlconfig) .uri(namespaceUri) .build(); } catch (Exception e){ LOG.error("[{}] Got exception while trying to initialize dlog namespace, uri is {}", namespaceUri, e); } if (dlNamespace.logExists(name)) { dlm = dlNamespace.openLog(name); } else { dlNamespace.createLog(name); dlm = dlNamespace.openLog(name); } dlm.openAsyncLogWriter().whenComplete(new FutureEventListener<AsyncLogWriter>() { @Override public void onSuccess(AsyncLogWriter asyncLogWriter) { LOG.info("[{}] Created log writer {}", name, asyncLogWriter.toString()); logWriter = asyncLogWriter; openLatch.countDown(); } @Override public void onFailure(Throwable throwable) { LOG.error("Failed open AsyncLogWriter for {}", name, throwable); openLatch.countDown(); } }); openLatch.await(); logWriter.write(new LogRecord(System.currentTimeMillis(), "thisIsTheRecord".getBytes())).whenComplete(new FutureEventListener<DLSN>(){ @Override public void onSuccess(DLSN dlsn) { first = dlsn; LOG.info("[{}] write-complete: dlsn={}", this, dlsn); writeLatch.countDown(); } @Override public void onFailure(Throwable throwable) { LOG.info("[{}] write-fail: throwable={}", this, throwable); writeLatch.countDown(); } }); writeLatch.await(); // write again logWriter.write(new LogRecord(System.currentTimeMillis(), "thisIsTheSecondRecord".getBytes())).whenComplete(new FutureEventListener<DLSN>(){ @Override public void onSuccess(DLSN dlsn) { LOG.info("[{}] write-complete: dlsn={}", this, dlsn); second = dlsn; writeLatch2.countDown(); } @Override public void onFailure(Throwable throwable) { LOG.info("[{}] write-fail: throwable={}", this, throwable); writeLatch2.countDown(); } }); writeLatch2.await(); // use dlm read last is not ok LOG.info("getLastLogRecord return {}", new String(dlm.getLastLogRecord().getPayload())); // even sleep can't get latest record Thread.sleep(1000); LOG.info("getLastLogRecord again return {}", new String(dlm.getLastLogRecord().getPayload())); // use logReader to read first is ok dlm.openAsyncLogReader(first).whenComplete(new FutureEventListener<AsyncLogReader>() { @Override public void onSuccess(AsyncLogReader value) { logReader = value; openReaderLatch.countDown(); } @Override public void onFailure(Throwable cause) { openReaderLatch.countDown(); } }); openReaderLatch.await(); logReader.readNext().whenComplete(new FutureEventListener<LogRecordWithDLSN>() { @Override public void onSuccess(LogRecordWithDLSN logRecordWithDLSN) { LOG.info("getLastLogRecord in logReader return {}", new String(logRecordWithDLSN.getPayload())); readLatch.countDown(); logReader.asyncClose(); } @Override public void onFailure(Throwable throwable) { readLatch.countDown(); logReader.asyncClose(); } }); readLatch.await(); // use logReader to read second is not ok dlm.openAsyncLogReader(second).whenComplete(new FutureEventListener<AsyncLogReader>() { @Override public void onSuccess(AsyncLogReader value) { logReader2 = value; openReaderLatch2.countDown(); } @Override public void onFailure(Throwable cause) { openReaderLatch2.countDown(); } }); openReaderLatch2.await(); logReader2.readNext().whenComplete(new FutureEventListener<LogRecordWithDLSN>() { @Override public void onSuccess(LogRecordWithDLSN logRecordWithDLSN) { LOG.info("getLastLogRecord in logReader return {}", new String(logRecordWithDLSN.getPayload())); readLatch2.countDown(); logReader2.asyncClose(); } @Override public void onFailure(Throwable throwable) { readLatch2.countDown(); logReader2.asyncClose(); } }); readLatch2.await(); dlm.close(); dlNamespace.close(); } } ` ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected]
With regards, Apache Git Services
