ArvinDevel commented on issue #203: Get last Record can't finish after write 
succeed
URL: https://github.com/apache/distributedlog/issues/203#issuecomment-335670140
 
 
   `package org.apache.distributedlog;
   
   import org.apache.distributedlog.api.AsyncLogReader;
   import org.apache.distributedlog.api.AsyncLogWriter;
   import org.apache.distributedlog.api.DistributedLogManager;
   import org.apache.distributedlog.api.namespace.Namespace;
   import org.apache.distributedlog.api.namespace.NamespaceBuilder;
   import org.apache.distributedlog.common.concurrent.FutureEventListener;
   import org.junit.Test;
   
   import java.net.URI;
   import java.util.Optional;
   import java.util.concurrent.CountDownLatch;
   
   
   /**
    *  TestReadLast.
    */
   public class TestReadLast extends TestDistributedLogBase{
       Namespace dlNamespace = null;
       DistributedLogManager dlm = null;
       AsyncLogWriter logWriter = null;
       AsyncLogReader logReader = null;
       AsyncLogReader logReader2 = null;
       DLSN first = null;
       DLSN second = null;
   
       @Test
       public void testReadLast() throws Exception {
   
           String name = "testReadLast";
           DistributedLogConfiguration dlconfig = new 
DistributedLogConfiguration();
           URI namespaceUri = null;
           CountDownLatch openLatch = new CountDownLatch(1);
           CountDownLatch writeLatch = new CountDownLatch(1);
           CountDownLatch writeLatch2 = new CountDownLatch(1);
           CountDownLatch openReaderLatch = new CountDownLatch(1);
           CountDownLatch readLatch = new CountDownLatch(1);
           CountDownLatch openReaderLatch2 = new CountDownLatch(1);
           CountDownLatch readLatch2 = new CountDownLatch(1);
   
           try {
               namespaceUri = createDLMURI("/default_namespace");
               ensureURICreated(namespaceUri);
               LOG.info("created DLM URI {} succeed ", namespaceUri.toString());
           } catch (Exception ioe){
               LOG.info("create DLM URI error {}", ioe.toString());
           }
           //initialize dl namespace
           //set dlog transmit outputBuffer size to 0, entry will have only one 
record.
           dlconfig.setOutputBufferSize(0);
           try {
               dlNamespace = NamespaceBuilder.newBuilder()
                       .conf(dlconfig)
                       .uri(namespaceUri)
                       .build();
   
           } catch (Exception e){
               LOG.error("[{}] Got exception while trying to initialize dlog 
namespace, uri is {}", namespaceUri, e);
           }
           if (dlNamespace.logExists(name)) {
               dlm = dlNamespace.openLog(name);
           } else {
               dlNamespace.createLog(name);
               dlm = dlNamespace.openLog(name);
           }
   
           dlm.openAsyncLogWriter().whenComplete(new 
FutureEventListener<AsyncLogWriter>() {
               @Override
               public void onSuccess(AsyncLogWriter asyncLogWriter) {
                   LOG.info("[{}] Created log writer {}", name, 
asyncLogWriter.toString());
                   logWriter = asyncLogWriter;
                   openLatch.countDown();
               }
   
               @Override
               public void onFailure(Throwable throwable) {
                   LOG.error("Failed open AsyncLogWriter for {}", name, 
throwable);
                   openLatch.countDown();
               }
           });
           openLatch.await();
   
           logWriter.write(new LogRecord(System.currentTimeMillis(),
                   "thisIsTheRecord".getBytes())).whenComplete(new 
FutureEventListener<DLSN>(){
               @Override
               public void onSuccess(DLSN dlsn) {
                   first = dlsn;
                   LOG.info("[{}] write-complete: dlsn={}", this, dlsn);
                   writeLatch.countDown();
               }
   
               @Override
               public void onFailure(Throwable throwable) {
                   LOG.info("[{}] write-fail: throwable={}", this, throwable);
                   writeLatch.countDown();
   
               }
           });
           writeLatch.await();
   
   
           // write again
           logWriter.write(new LogRecord(System.currentTimeMillis(),
                   "thisIsTheSecondRecord".getBytes())).whenComplete(new 
FutureEventListener<DLSN>(){
               @Override
               public void onSuccess(DLSN dlsn) {
                   LOG.info("[{}] write-complete: dlsn={}", this, dlsn);
                   second = dlsn;
                   writeLatch2.countDown();
               }
   
               @Override
               public void onFailure(Throwable throwable) {
                   LOG.info("[{}] write-fail: throwable={}", this, throwable);
                   writeLatch2.countDown();
   
               }
           });
   
           writeLatch2.await();
   
           // use dlm read last is not ok
           LOG.info("getLastLogRecord return {}", new 
String(dlm.getLastLogRecord().getPayload()));
           // even sleep can't get latest record
           Thread.sleep(1000);
           LOG.info("getLastLogRecord again return {}", new 
String(dlm.getLastLogRecord().getPayload()));
   
           // use logReader to read first is ok
           dlm.openAsyncLogReader(first).whenComplete(new 
FutureEventListener<AsyncLogReader>() {
               @Override
               public void onSuccess(AsyncLogReader value) {
                   logReader = value;
                   openReaderLatch.countDown();
               }
   
               @Override
               public void onFailure(Throwable cause) {
                   openReaderLatch.countDown();
   
               }
           });
           openReaderLatch.await();
           logReader.readNext().whenComplete(new 
FutureEventListener<LogRecordWithDLSN>() {
               @Override
               public void onSuccess(LogRecordWithDLSN logRecordWithDLSN) {
                   LOG.info("getLastLogRecord in logReader return {}", new 
String(logRecordWithDLSN.getPayload()));
                   readLatch.countDown();
                   logReader.asyncClose();
               }
               @Override
               public void onFailure(Throwable throwable) {
                   readLatch.countDown();
                   logReader.asyncClose();
               }
           });
           readLatch.await();
   
           // use logReader to read second is not ok
           dlm.openAsyncLogReader(second).whenComplete(new 
FutureEventListener<AsyncLogReader>() {
               @Override
               public void onSuccess(AsyncLogReader value) {
                   logReader2 = value;
                   openReaderLatch2.countDown();
               }
   
               @Override
               public void onFailure(Throwable cause) {
                   openReaderLatch2.countDown();
   
               }
           });
           openReaderLatch2.await();
           logReader2.readNext().whenComplete(new 
FutureEventListener<LogRecordWithDLSN>() {
               @Override
               public void onSuccess(LogRecordWithDLSN logRecordWithDLSN) {
                   LOG.info("getLastLogRecord in logReader return {}", new 
String(logRecordWithDLSN.getPayload()));
                   readLatch2.countDown();
                   logReader2.asyncClose();
               }
               @Override
               public void onFailure(Throwable throwable) {
                   readLatch2.countDown();
                   logReader2.asyncClose();
               }
           });
           readLatch2.await();
   
           dlm.close();
           dlNamespace.close();
       }
   }
   `
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to