ArvinDevel commented on issue #202: DistributedLogManager's getLastDLSN runs long time, especially after reOpen log stream URL: https://github.com/apache/distributedlog/issues/202#issuecomment-334655739 Below is my unit test case, which often fail. `package org.apache.distributedlog; import static org.junit.Assert.assertEquals; import java.net.URI; import java.util.Optional; import java.util.concurrent.CountDownLatch; import org.apache.distributedlog.api.AsyncLogWriter; import org.apache.distributedlog.api.DistributedLogManager; import org.apache.distributedlog.api.namespace.Namespace; import org.apache.distributedlog.api.namespace.NamespaceBuilder; import org.apache.distributedlog.common.concurrent.FutureEventListener; import org.apache.distributedlog.exceptions.LogEmptyException; import org.junit.Test; /** * 2017/10/6. */ public class TestReOpen extends TestDistributedLogBase{ DistributedLogManager dlm = null; AsyncLogWriter logWriter = null; volatile DLSN last = null; Namespace dlNamespace = null; @Test public void testReadWritewithReOpen() throws Exception { String name = "testReadWritewithReOpen"; DistributedLogConfiguration dlconfig = new DistributedLogConfiguration(); DistributedLogConfiguration streamConfig = new DistributedLogConfiguration(); URI namespaceUri = null; CountDownLatch openLatch = new CountDownLatch(1); CountDownLatch writeLatch = new CountDownLatch(1); CountDownLatch closeLatch = new CountDownLatch(1); CountDownLatch doneLatch = new CountDownLatch(1); streamConfig.setLogSegmentRollingIntervalMinutes(10); streamConfig.setMaxLogSegmentBytes(2 * 1024 * 1024); streamConfig.setRetentionPeriodHours(1); try { namespaceUri = createDLMURI("/default_namespace"); ensureURICreated(namespaceUri); LOG.info("created DLM URI {} succeed ", namespaceUri.toString()); } catch (Exception ioe){ LOG.info("create DLM URI error {}", ioe.toString()); } //initialize dl namespace //set dlog transmit outputBuffer size to 0, entry will have only one record. dlconfig.setOutputBufferSize(0); try { dlNamespace = NamespaceBuilder.newBuilder() .conf(dlconfig) .uri(namespaceUri) .build(); } catch (Exception e){ LOG.error("[{}] Got exception while trying to initialize dlog namespace, uri is {}", namespaceUri, e); } if (dlNamespace.logExists(name)) { dlm = dlNamespace.openLog(name, Optional.of(streamConfig), Optional.empty(), Optional.empty()); } else { dlNamespace.createLog(name); dlm = dlNamespace.openLog(name, Optional.of(streamConfig), Optional.empty(), Optional.empty()); } dlm.openAsyncLogWriter().whenComplete(new FutureEventListener<AsyncLogWriter>() { @Override public void onSuccess(AsyncLogWriter asyncLogWriter) { LOG.info("[{}] Created log writer {}", name, asyncLogWriter.toString()); logWriter = asyncLogWriter; try { LOG.info("before getLastDLSN"); last = dlm.getLastDLSN(); LOG.info("after getLastDLSN"); } catch (LogEmptyException lee){ LOG.info("the log stream is empty "); } catch (Exception e){ LOG.error("Faced Exception in getLastDLSN", e); } LOG.info("getLastDLSN return {}", last); openLatch.countDown(); } @Override public void onFailure(Throwable throwable) { LOG.error("Failed open AsyncLogWriter for {}", name, throwable); openLatch.countDown(); } }); openLatch.await(); logWriter.write(new LogRecord(System.currentTimeMillis(), "thisIsTheRecord".getBytes())).whenComplete(new FutureEventListener<DLSN>(){ @Override public void onSuccess(DLSN dlsn) { LOG.info("[{}] write-complete: dlsn={}", this, dlsn); writeLatch.countDown(); } @Override public void onFailure(Throwable throwable) { LOG.info("[{}] write-fail: throwable={}", this, throwable); } }); writeLatch.await(); dlm.asyncClose().whenComplete(new FutureEventListener<Void>() { @Override public void onSuccess(Void value) { closeLatch.countDown(); } @Override public void onFailure(Throwable cause) { } }); closeLatch.await(); // ReOpen the DLM and get lastDLSN if (dlNamespace.logExists(name)) { dlm = dlNamespace.openLog(name, Optional.of(streamConfig), Optional.empty(), Optional.empty()); } else { dlNamespace.createLog(name); dlm = dlNamespace.openLog(name, Optional.of(streamConfig), Optional.empty(), Optional.empty()); } dlm.openAsyncLogWriter().whenComplete(new FutureEventListener<AsyncLogWriter>() { @Override public void onSuccess(AsyncLogWriter asyncLogWriter) { LOG.info("[{}] Created log writer again {}", name, asyncLogWriter.toString()); try { LOG.info("before getLastDLSN again"); last = dlm.getLastDLSN(); LOG.info("after getLastDLSN again"); } catch (LogEmptyException lee){ LOG.info("the log stream is empty "); } catch (Exception e){ LOG.error("Faced Exception in getLastDLSN", e); } LOG.info("getLastDLSN return {}", last); doneLatch.countDown(); } @Override public void onFailure(Throwable throwable) { LOG.error("Failed open AsyncLogWriter for {}", name, throwable); doneLatch.countDown(); } }); doneLatch.await(); assertEquals(last, new DLSN(1, 0, 0)); dlm.close(); dlNamespace.close(); } } ` ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected]
With regards, Apache Git Services
