This is an automated email from the ASF dual-hosted git repository. hxd pushed a commit to branch tsfileprocessortest_8181_port in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
commit 76b0b8c63b4136bd7620ca9f742ee226db597120 Author: xiangdong huang <[email protected]> AuthorDate: Thu Feb 20 12:59:29 2020 +0800 enable mergeLog for checking why merging is hanged add stop timeout for 8181 disable 8181 port when test --- .travis.yml | 2 +- .../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 11 +++ .../apache/iotdb/db/engine/flush/FlushManager.java | 16 ++++- .../db/engine/storagegroup/TsFileProcessor.java | 79 ++++++++++++++++------ .../apache/iotdb/db/service/MetricsService.java | 5 +- .../storagegroup/StorageGroupProcessorTest.java | 1 + .../engine/storagegroup/TsFileProcessorTest.java | 50 ++++++++------ .../apache/iotdb/db/utils/EnvironmentUtils.java | 3 + server/src/test/resources/logback.xml | 4 +- 9 files changed, 122 insertions(+), 49 deletions(-) diff --git a/.travis.yml b/.travis.yml index 6df9213..0b2cf4c 100644 --- a/.travis.yml +++ b/.travis.yml @@ -140,7 +140,7 @@ matrix: - mvn -version # Output something every 10 minutes or Travis kills the job - while sleep 540; do echo "=====[ $SECONDS seconds still running ]====="; done & - - travis_wait 40 mvn -B clean integration-test + - travis_wait 20 mvn -B clean integration-test # Killing background sleep loop - kill %1 diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java index 08eeb42..88a3e55 100644 --- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java +++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java @@ -483,6 +483,9 @@ public class IoTDBConfig { //wait for 60 second by default. private int thriftServerAwaitTimeForStopService = 60; + private boolean enableMetricsWebService = true; + + public IoTDBConfig() { // empty constructor } @@ -1353,4 +1356,12 @@ public class IoTDBConfig { public void setThriftServerAwaitTimeForStopService(int thriftServerAwaitTimeForStopService) { this.thriftServerAwaitTimeForStopService = thriftServerAwaitTimeForStopService; } + + public boolean isEnableMetricsWebService() { + return enableMetricsWebService; + } + + public void setEnableMetricsWebService(boolean enableMetricsWebService) { + this.enableMetricsWebService = enableMetricsWebService; + } } diff --git a/server/src/main/java/org/apache/iotdb/db/engine/flush/FlushManager.java b/server/src/main/java/org/apache/iotdb/db/engine/flush/FlushManager.java index 8c224e0..147771e 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/flush/FlushManager.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/flush/FlushManager.java @@ -88,6 +88,7 @@ public class FlushManager implements FlushManagerMBean, IService { TsFileProcessor tsFileProcessor = tsFileProcessorQueue.poll(); tsFileProcessor.flushOneMemTable(); tsFileProcessor.setManagedByFlushManager(false); + logger.error("Flush Thread re-register TSProcessor {} to the queue.", tsFileProcessor.getStorageGroupName()); registerTsFileProcessor(tsFileProcessor); } } @@ -99,11 +100,16 @@ public class FlushManager implements FlushManagerMBean, IService { public void registerTsFileProcessor(TsFileProcessor tsFileProcessor) { synchronized (tsFileProcessor) { if (!tsFileProcessor.isManagedByFlushManager() && tsFileProcessor.getFlushingMemTableSize() > 0) { - logger.info("storage group {} begin to submit a flush thread, flushing memtable size: {}", - tsFileProcessor.getStorageGroupName(), tsFileProcessor.getFlushingMemTableSize()); tsFileProcessorQueue.add(tsFileProcessor); + logger.info("storage group {} begin to submit a flush thread, flushing memtable size: {}, queue size: {}", + tsFileProcessor.getStorageGroupName(), tsFileProcessor.getFlushingMemTableSize(), tsFileProcessorQueue.size()); tsFileProcessor.setManagedByFlushManager(true); flushPool.submit(new FlushThread()); + } else if (tsFileProcessor.isManagedByFlushManager()){ + logger.info("tsFileProcessor {} is already in the flushPool, the first one: {}", tsFileProcessor.getStorageGroupName(), + tsFileProcessorQueue.size() >0 ? tsFileProcessorQueue.getFirst().getStorageGroupName() : "empty now"); + } else { + logger.info("No flushing memetable to do, register TsProcessor {} failed.", tsFileProcessor.getStorageGroupName()); } } } @@ -122,4 +128,10 @@ public class FlushManager implements FlushManagerMBean, IService { private static FlushManager instance = new FlushManager(); } + + public String toString() { + return String.format("TSProcessors in the queue: %d, TaskPool size %d + %d,", + tsFileProcessorQueue.size(), flushPool.getWorkingTasksNumber(), + flushPool.getWaitingTasksNumber()); + } } diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java index d803cde..c818a3e 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java @@ -116,7 +116,9 @@ public class TsFileProcessor { this.updateLatestFlushTimeCallback = updateLatestFlushTimeCallback; this.sequence = sequence; logger.info("create a new tsfile processor {}", tsfile.getAbsolutePath()); - + if (this.writer == null) { + logger.error("the writer is null!! {}", tsfile.getName()); + } // a file generated by flush has only one historical version, which is itself this.tsFileResource .setHistoricalVersions(Collections.singleton(versionController.currVersion())); @@ -225,6 +227,7 @@ public class TsFileProcessor { */ public void deleteDataInMemory(Deletion deletion) { flushQueryLock.writeLock().lock(); + logger.error("get flushQueryLock write lock"); try { if (workMemTable != null) { workMemTable @@ -236,6 +239,7 @@ public class TsFileProcessor { } } finally { flushQueryLock.writeLock().unlock(); + logger.error("release flushQueryLock write lock"); } } @@ -286,9 +290,13 @@ public class TsFileProcessor { while (!flushingMemTables.isEmpty()) { flushingMemTables.wait(60_000); if (System.currentTimeMillis() - startTime > 60_000) { - logger.warn("{} has spent {}s for waiting flushing one memtable; {} left.", + logger.warn("{} has spent {}s for waiting flushing one memtable; {} left (first: {}). FlushingManager info: {}", this.tsFileResource.getFile().getAbsolutePath(), - (System.currentTimeMillis() - startTime)/1000,flushingMemTables.size()); + (System.currentTimeMillis() - startTime)/1000, + flushingMemTables.size(), + flushingMemTables.getFirst(), + FlushManager.getInstance() + ); } } @@ -303,6 +311,7 @@ public class TsFileProcessor { void asyncClose() { flushQueryLock.writeLock().lock(); + logger.error("get flushQueryLock write lock"); try { logger.info("Async close the file: {}", tsFileResource.getFile().getAbsolutePath()); if (shouldClose) { @@ -318,16 +327,16 @@ public class TsFileProcessor { // is set true, we need to generate a NotifyFlushMemTable as a signal task and submit it to // the FlushManager. IMemTable tmpMemTable = workMemTable == null ? new NotifyFlushMemTable() : workMemTable; - if (logger.isDebugEnabled()) { + //if (logger.isDebugEnabled()) { if (tmpMemTable.isSignalMemTable()) { - logger.debug( + logger.error( "storage group {} add a signal memtable into flushing memtable list when async close", storageGroupName); } else { logger .debug("storage group {} async flush a memtable when async close", storageGroupName); } - } + //} try { addAMemtableIntoFlushingList(tmpMemTable); } catch (IOException e) { @@ -335,39 +344,43 @@ public class TsFileProcessor { } } finally { flushQueryLock.writeLock().unlock(); + logger.error("release flushQueryLock write lock"); } } /** * TODO if the flushing thread is too fast, the tmpMemTable.wait() may never wakeup + * Tips: I am trying to solve this issue by checking whether the table exist before wait() */ public void syncFlush() throws IOException { IMemTable tmpMemTable; flushQueryLock.writeLock().lock(); + logger.error("get flushQueryLock write lock"); try { tmpMemTable = workMemTable == null ? new NotifyFlushMemTable() : workMemTable; if (tmpMemTable.isSignalMemTable()) { - logger.debug("add a signal memtable into flushing memtable list when sync flush"); + logger.error("add a signal memtable into flushing memtable list when sync flush"); } addAMemtableIntoFlushingList(tmpMemTable); } finally { flushQueryLock.writeLock().unlock(); + logger.error("release flushQueryLock write lock"); } synchronized (tmpMemTable) { try { long startWait = System.currentTimeMillis(); - while (true) { + while (flushingMemTables.contains(tmpMemTable)) { tmpMemTable.wait(1000); - flushQueryLock.readLock().lock(); - try { - if (!flushingMemTables.contains(tmpMemTable)) { - break; - } - } finally { - flushQueryLock.readLock().unlock(); - } + //flushQueryLock.readLock().lock(); +// try { +// if (!flushingMemTables.contains(tmpMemTable)) { +// break; +// } +// } finally { +// //flushQueryLock.readLock().unlock(); +// } if ((System.currentTimeMillis() - startWait) > 60_000) { logger.warn("has waited for synced flushing a memtable in {} for 60 seconds.", @@ -387,6 +400,7 @@ public class TsFileProcessor { */ public void asyncFlush() { flushQueryLock.writeLock().lock(); + logger.error("get flushQueryLock write lock"); try { if (workMemTable == null) { return; @@ -398,6 +412,7 @@ public class TsFileProcessor { logger.error("WAL notify start flush failed", e); } finally { flushQueryLock.writeLock().unlock(); + logger.error("release flushQueryLock write lock"); } } @@ -411,6 +426,7 @@ public class TsFileProcessor { logger.error("Memetable info: " + tobeFlushed.getMemTableMap()); } flushingMemTables.addLast(tobeFlushed); + logger.error("Memtable (signal = {}) is added into the flushing Memtable", tobeFlushed.isSignalMemTable()); long cur = versionController.nextVersion(); tobeFlushed.setVersion(cur); if (IoTDBDescriptor.getInstance().getConfig().isEnableWal()) { @@ -429,15 +445,30 @@ public class TsFileProcessor { */ private void releaseFlushedMemTable(IMemTable memTable) { flushQueryLock.writeLock().lock(); + logger.error("get flushQueryLock write lock"); try { + if (writer == null) { + logger.error("the writer is null, why?? memtable (signal = {})", memTable.isSignalMemTable()); + } writer.makeMetadataVisible(); - flushingMemTables.remove(memTable); + if (!flushingMemTables.remove(memTable)) { + logger.warn( + "put the memtable (signal={}) out of flushingMemtables but it is not in the queue.", + memTable.isSignalMemTable()); + } else { + logger.warn( + "memtable (signal={}) is removed from the queue. {} left.", + memTable.isSignalMemTable(), flushingMemTables.size()); + } memTable.release(); MemTablePool.getInstance().putBack(memTable, storageGroupName); logger.debug("storage group {} flush finished, remove a memtable from flushing list, " + "flushing memtable list size: {}", storageGroupName, flushingMemTables.size()); + } catch (Exception e) { + logger.error(e.getMessage(), e); } finally { flushQueryLock.writeLock().unlock(); + logger.error("release flushQueryLock write lock"); } } @@ -474,12 +505,13 @@ public class TsFileProcessor { getLogNode().notifyEndFlush(); } } - - releaseFlushedMemTable(memTableToFlush); - + logger.error("try get lock to release a memtable (signal={})", memTableToFlush.isSignalMemTable()); // for sync flush synchronized (memTableToFlush) { + logger.error("have got lock to release a memtable (signal={})", memTableToFlush.isSignalMemTable()); + releaseFlushedMemTable(memTableToFlush); memTableToFlush.notifyAll(); + logger.error("released a memtable (signal={}), flushingMemtables size ={}", memTableToFlush.isSignalMemTable(), flushingMemTables.size()); } if (shouldClose && flushingMemTables.isEmpty()) { @@ -500,6 +532,7 @@ public class TsFileProcessor { } catch (IOException e) { logger.error("update compression ratio failed", e); } + logger.error("flushing Memtable is empty and will close the file"); endFile(); } catch (IOException | TsFileProcessorException e) { logger.error("meet error when flush FileMetadata to {}, change system mode to read-only", @@ -522,7 +555,6 @@ public class TsFileProcessor { private void endFile() throws IOException, TsFileProcessorException { long closeStartTime = System.currentTimeMillis(); - tsFileResource.serialize(); writer.endFile(schema); tsFileResource.cleanCloseFlag(); @@ -542,6 +574,7 @@ public class TsFileProcessor { DatetimeUtils.convertMillsecondToZonedDateTime(closeEndTime), closeEndTime - closeStartTime); } + logger.error("Storage group {} close the file {}", storageGroupName, tsFileResource.getFile().getAbsoluteFile()); } @@ -603,6 +636,7 @@ public class TsFileProcessor { String measurementId, TSDataType dataType, TSEncoding encoding, Map<String, String> props, QueryContext context) { flushQueryLock.readLock().lock(); + logger.error("get flushQueryLock read lock"); try { List<ReadOnlyMemChunk> readOnlyMemChunks = new ArrayList<>(); for (IMemTable flushingMemTable : flushingMemTables) { @@ -635,10 +669,11 @@ public class TsFileProcessor { chunkMetaDataList.removeIf(context::chunkNotSatisfy); return new Pair<>(readOnlyMemChunks, chunkMetaDataList); - } catch (IOException | QueryProcessException e) { + } catch (Exception e) { logger.error("get ReadOnlyMemChunk has error", e); } finally { flushQueryLock.readLock().unlock(); + logger.error("release flushQueryLock read lock"); } return null; } diff --git a/server/src/main/java/org/apache/iotdb/db/service/MetricsService.java b/server/src/main/java/org/apache/iotdb/db/service/MetricsService.java index 3456c75..6994886 100644 --- a/server/src/main/java/org/apache/iotdb/db/service/MetricsService.java +++ b/server/src/main/java/org/apache/iotdb/db/service/MetricsService.java @@ -80,6 +80,9 @@ public class MetricsService implements MetricsServiceMBean, IService { @Override public synchronized void startService() throws StartupException { + if (!IoTDBDescriptor.getInstance().getConfig().isEnableMetricsWebService()) { + return; + } logger.info("{}: start {}...", IoTDBConstant.GLOBAL_DB_NAME, this.getID().getName()); executorService = Executors.newSingleThreadExecutor(); int port = getMetricsPort(); @@ -88,7 +91,7 @@ public class MetricsService implements MetricsServiceMBean, IService { metricsWebUI.getHandlers().add(metricsSystem.getServletHandlers()); metricsWebUI.initialize(); server = metricsWebUI.getServer(port); - server.setStopTimeout(5000); + server.setStopTimeout(10000); metricsSystem.start(); try { executorService.execute(new MetricsServiceThread(server)); diff --git a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessorTest.java b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessorTest.java index 2d179b3..fdf8c23 100644 --- a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessorTest.java +++ b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessorTest.java @@ -78,6 +78,7 @@ public class StorageGroupProcessorTest { EnvironmentUtils.cleanEnv(); EnvironmentUtils.cleanDir(TestConstant.OUTPUT_DATA_DIR); MergeManager.getINSTANCE().stop(); + EnvironmentUtils.cleanEnv(); } diff --git a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessorTest.java b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessorTest.java index 6e732b8..62efc1d 100644 --- a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessorTest.java +++ b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessorTest.java @@ -52,6 +52,8 @@ import org.apache.iotdb.tsfile.write.writer.RestorableTsFileIOWriter; import org.junit.After; import org.junit.Before; import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class TsFileProcessorTest { @@ -65,7 +67,7 @@ public class TsFileProcessorTest { private TSEncoding encoding = TSEncoding.RLE; private Map<String, String> props = Collections.emptyMap(); private QueryContext context; - + private static Logger logger = LoggerFactory.getLogger(TsFileProcessorTest.class); @Before public void setUp() throws Exception { EnvironmentUtils.envSetUp(); @@ -82,9 +84,9 @@ public class TsFileProcessorTest { @Test public void testWriteAndFlush() throws IOException, QueryProcessException { + logger.error("testWriteAndFlush begin.."); processor = new TsFileProcessor(storageGroup, SystemFileFactory.INSTANCE.getFile(filePath), - SchemaUtils.constructSchema(deviceId), SysTimeVersionController.INSTANCE, x -> { - }, + SchemaUtils.constructSchema(deviceId), SysTimeVersionController.INSTANCE, this::closeTsFileProcessor, (tsFileProcessor) -> true, true); Pair<List<ReadOnlyMemChunk>, List<ChunkMetaData>> pair = processor @@ -130,9 +132,9 @@ public class TsFileProcessorTest { @Test public void testWriteAndRestoreMetadata() throws IOException, QueryProcessException { + logger.error("testWriteAndRestoreMetadata begin.."); processor = new TsFileProcessor(storageGroup, SystemFileFactory.INSTANCE.getFile(filePath), - SchemaUtils.constructSchema(deviceId), SysTimeVersionController.INSTANCE, x -> { - }, + SchemaUtils.constructSchema(deviceId), SysTimeVersionController.INSTANCE, this::closeTsFileProcessor, (tsFileProcessor) -> true, true); Pair<List<ReadOnlyMemChunk>, List<ChunkMetaData>> pair = processor @@ -163,7 +165,7 @@ public class TsFileProcessorTest { assertEquals(num, timeValuePair.getValue().getInt()); } } - + logger.error("syncFlush.."); // flush synchronously processor.syncFlush(); @@ -193,15 +195,17 @@ public class TsFileProcessorTest { } } restorableTsFileIOWriter.close(); + logger.error("syncClose.."); processor.syncClose(); + //we need to close the tsfile writer first and then reopen it. } @Test public void testMultiFlush() throws IOException, QueryProcessException { + logger.error("testWriteAndRestoreMetadata begin.."); processor = new TsFileProcessor(storageGroup, SystemFileFactory.INSTANCE.getFile(filePath), - SchemaUtils.constructSchema(deviceId), SysTimeVersionController.INSTANCE, x -> { - }, + SchemaUtils.constructSchema(deviceId), SysTimeVersionController.INSTANCE, this::closeTsFileProcessor, (tsFileProcessor) -> true, true); Pair<List<ReadOnlyMemChunk>, List<ChunkMetaData>> pair = processor @@ -234,22 +238,10 @@ public class TsFileProcessorTest { @Test public void testWriteAndClose() throws IOException, QueryProcessException { + logger.error("testWriteAndRestoreMetadata begin.."); processor = new TsFileProcessor(storageGroup, SystemFileFactory.INSTANCE.getFile(filePath), SchemaUtils.constructSchema(deviceId), SysTimeVersionController.INSTANCE, - unsealedTsFileProcessor -> { - TsFileResource resource = unsealedTsFileProcessor.getTsFileResource(); - synchronized (resource) { - for (Entry<String, Long> startTime : resource.getStartTimeMap().entrySet()) { - String deviceId = startTime.getKey(); - resource.getEndTimeMap().put(deviceId, resource.getStartTimeMap().get(deviceId)); - } - try { - resource.close(); - } catch (IOException e) { - throw new TsFileProcessorException(e); - } - } - }, (tsFileProcessor) -> true, true); + this::closeTsFileProcessor, (tsFileProcessor) -> true, true); Pair<List<ReadOnlyMemChunk>, List<ChunkMetaData>> pair = processor .query(deviceId, measurementId, dataType, encoding, props, context); @@ -286,4 +278,18 @@ public class TsFileProcessorTest { assertTrue(processor.getTsFileResource().isClosed()); } + private void closeTsFileProcessor(TsFileProcessor unsealedTsFileProcessor) throws TsFileProcessorException { + TsFileResource resource = unsealedTsFileProcessor.getTsFileResource(); + synchronized (resource) { + for (Entry<String, Long> startTime : resource.getStartTimeMap().entrySet()) { + String deviceId = startTime.getKey(); + resource.getEndTimeMap().put(deviceId, resource.getStartTimeMap().get(deviceId)); + } + try { + resource.close(); + } catch (IOException e) { + throw new TsFileProcessorException(e); + } + } + } } \ No newline at end of file diff --git a/server/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java b/server/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java index 871506f..f436b3d 100644 --- a/server/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java +++ b/server/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java @@ -78,6 +78,7 @@ public class EnvironmentUtils { private static IoTDB daemon; public static void cleanEnv() throws IOException, StorageEngineException { + logger.warn("EnvironmentUtil cleanEnv..."); if (daemon != null) { daemon.stop(); daemon = null; @@ -191,6 +192,8 @@ public class EnvironmentUtils { logger.warn("EnvironmentUtil setup..."); System.setProperty(IoTDBConstant.REMOTE_JMX_PORT_NAME, "31999"); IoTDBDescriptor.getInstance().getConfig().setThriftServerAwaitTimeForStopService(0); + //we do not start 8181 port in test. + IoTDBDescriptor.getInstance().getConfig().setEnableMetricsWebService(false); if (daemon == null) { daemon = new IoTDB(); } diff --git a/server/src/test/resources/logback.xml b/server/src/test/resources/logback.xml index 8e61f7d..5b16f61 100644 --- a/server/src/test/resources/logback.xml +++ b/server/src/test/resources/logback.xml @@ -41,8 +41,10 @@ <!-- enable me if you want to monitor when files are opened and closed. <logger name="FileMonitor" level="info"/> --> - <logger name="FileMonitor" level="info"/> + <logger name="org.apache.iotdb.db.engine.merge" level="DEBUG"/> + <logger name="org.eclipse.jetty.util.thread.QueuedThreadPool" level="DEBUG"/> <logger name="org.apache.iotdb.db.service.MetricsService" level="INFO" /> + <logger name="org.apache.iotdb.db.engine.flush.FlushManager" level="INFO" /> <root level="WARN"> <appender-ref ref="stdout"/> </root>
