ArvinDevel commented on issue #202: DistributedLogManager's getLastDLSN runs 
long time, especially after reOpen log stream
URL: https://github.com/apache/distributedlog/issues/202#issuecomment-334655739
 
 
   Below is my unit test case, which often fail.
   `package org.apache.distributedlog;
   
   import static org.junit.Assert.assertEquals;
   
   import java.net.URI;
   import java.util.Optional;
   import java.util.concurrent.CountDownLatch;
   import org.apache.distributedlog.api.AsyncLogWriter;
   import org.apache.distributedlog.api.DistributedLogManager;
   import org.apache.distributedlog.api.namespace.Namespace;
   import org.apache.distributedlog.api.namespace.NamespaceBuilder;
   import org.apache.distributedlog.common.concurrent.FutureEventListener;
   import org.apache.distributedlog.exceptions.LogEmptyException;
   import org.junit.Test;
   
   
   
   
   
   /**
    *  2017/10/6.
    */
   public class TestReOpen extends TestDistributedLogBase{
       DistributedLogManager dlm = null;
       AsyncLogWriter logWriter = null;
       volatile DLSN last = null;
       Namespace dlNamespace = null;
       @Test
       public void testReadWritewithReOpen() throws Exception {
           String name = "testReadWritewithReOpen";
           DistributedLogConfiguration dlconfig = new 
DistributedLogConfiguration();
           DistributedLogConfiguration streamConfig = new 
DistributedLogConfiguration();
           URI namespaceUri = null;
           CountDownLatch openLatch = new CountDownLatch(1);
           CountDownLatch writeLatch = new CountDownLatch(1);
           CountDownLatch closeLatch = new CountDownLatch(1);
           CountDownLatch doneLatch = new CountDownLatch(1);
   
           streamConfig.setLogSegmentRollingIntervalMinutes(10);
           streamConfig.setMaxLogSegmentBytes(2 * 1024 * 1024);
           streamConfig.setRetentionPeriodHours(1);
   
           try {
               namespaceUri = createDLMURI("/default_namespace");
               ensureURICreated(namespaceUri);
               LOG.info("created DLM URI {} succeed ", namespaceUri.toString());
           } catch (Exception ioe){
               LOG.info("create DLM URI error {}", ioe.toString());
           }
           //initialize dl namespace
           //set dlog transmit outputBuffer size to 0, entry will have only one 
record.
           dlconfig.setOutputBufferSize(0);
           try {
               dlNamespace = NamespaceBuilder.newBuilder()
                       .conf(dlconfig)
                       .uri(namespaceUri)
                       .build();
   
           } catch (Exception e){
               LOG.error("[{}] Got exception while trying to initialize dlog 
namespace, uri is {}", namespaceUri, e);
           }
           if (dlNamespace.logExists(name)) {
               dlm = dlNamespace.openLog(name, Optional.of(streamConfig), 
Optional.empty(), Optional.empty());
           } else {
               dlNamespace.createLog(name);
               dlm = dlNamespace.openLog(name, Optional.of(streamConfig), 
Optional.empty(), Optional.empty());
           }
   
           dlm.openAsyncLogWriter().whenComplete(new 
FutureEventListener<AsyncLogWriter>() {
               @Override
               public void onSuccess(AsyncLogWriter asyncLogWriter) {
                   LOG.info("[{}] Created log writer {}", name, 
asyncLogWriter.toString());
                   logWriter = asyncLogWriter;
                   try {
                       LOG.info("before getLastDLSN");
                       last = dlm.getLastDLSN();
                       LOG.info("after getLastDLSN");
                   } catch (LogEmptyException lee){
                       LOG.info("the log stream is empty ");
                   } catch (Exception e){
                       LOG.error("Faced Exception in getLastDLSN", e);
                   }
                   LOG.info("getLastDLSN return {}", last);
                   openLatch.countDown();
               }
   
               @Override
               public void onFailure(Throwable throwable) {
                   LOG.error("Failed open AsyncLogWriter for {}", name, 
throwable);
                   openLatch.countDown();
               }
           });
   
           openLatch.await();
           logWriter.write(new LogRecord(System.currentTimeMillis(),
                   "thisIsTheRecord".getBytes())).whenComplete(new 
FutureEventListener<DLSN>(){
               @Override
               public void onSuccess(DLSN dlsn) {
                    LOG.info("[{}] write-complete: dlsn={}", this, dlsn);
                    writeLatch.countDown();
               }
   
               @Override
               public void onFailure(Throwable throwable) {
                   LOG.info("[{}] write-fail: throwable={}", this, throwable);
   
               }
           });
           writeLatch.await();
           dlm.asyncClose().whenComplete(new FutureEventListener<Void>() {
               @Override
               public void onSuccess(Void value) {
                   closeLatch.countDown();
               }
   
               @Override
               public void onFailure(Throwable cause) {
   
               }
           });
           closeLatch.await();
   
           // ReOpen the DLM and get lastDLSN
           if (dlNamespace.logExists(name)) {
               dlm = dlNamespace.openLog(name, Optional.of(streamConfig), 
Optional.empty(), Optional.empty());
           } else {
               dlNamespace.createLog(name);
               dlm = dlNamespace.openLog(name, Optional.of(streamConfig), 
Optional.empty(), Optional.empty());
           }
   
           dlm.openAsyncLogWriter().whenComplete(new 
FutureEventListener<AsyncLogWriter>() {
               @Override
               public void onSuccess(AsyncLogWriter asyncLogWriter) {
                   LOG.info("[{}] Created log writer again {}", name, 
asyncLogWriter.toString());
                   try {
                       LOG.info("before getLastDLSN again");
                       last = dlm.getLastDLSN();
                       LOG.info("after getLastDLSN again");
                   } catch (LogEmptyException lee){
                       LOG.info("the log stream is empty ");
                   } catch (Exception e){
                       LOG.error("Faced Exception in getLastDLSN", e);
                   }
                   LOG.info("getLastDLSN return {}", last);
                   doneLatch.countDown();
               }
   
               @Override
               public void onFailure(Throwable throwable) {
                   LOG.error("Failed open AsyncLogWriter for {}", name, 
throwable);
                   doneLatch.countDown();
               }
           });
           doneLatch.await();
           assertEquals(last, new DLSN(1, 0, 0));
           dlm.close();
           dlNamespace.close();
       }
   
   }
   `
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to