This is an automated email from the ASF dual-hosted git repository.
hxd pushed a commit to branch fix_add_future_4_flush_opeartion
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
The following commit(s) were added to
refs/heads/fix_add_future_4_flush_opeartion by this push:
new 45b4ccb replace FlushStatus by Future
45b4ccb is described below
commit 45b4ccb4bdbe4892af3a406c022291d42fa20add
Author: xiangdong huang <[email protected]>
AuthorDate: Tue Feb 12 21:56:43 2019 +0800
replace FlushStatus by Future
---
.../engine/bufferwrite/BufferWriteProcessor.java | 128 ++++++++---------
.../db/engine/overflow/io/OverflowProcessor.java | 157 +++++++++++----------
.../apache/iotdb/db/engine/utils/FlushStatus.java | 45 ------
.../apache/iotdb/db/qp/constant/DatetimeUtils.java | 6 +
.../bufferwrite/BufferWriteProcessorNewTest.java | 14 +-
.../bufferwrite/BufferWriteProcessorTest.java | 16 ++-
.../engine/overflow/io/OverflowProcessorTest.java | 1 -
7 files changed, 176 insertions(+), 191 deletions(-)
diff --git
a/iotdb/src/main/java/org/apache/iotdb/db/engine/bufferwrite/BufferWriteProcessor.java
b/iotdb/src/main/java/org/apache/iotdb/db/engine/bufferwrite/BufferWriteProcessor.java
index 2577ebf..f8d7153 100644
---
a/iotdb/src/main/java/org/apache/iotdb/db/engine/bufferwrite/BufferWriteProcessor.java
+++
b/iotdb/src/main/java/org/apache/iotdb/db/engine/bufferwrite/BufferWriteProcessor.java
@@ -20,8 +20,6 @@ package org.apache.iotdb.db.engine.bufferwrite;
import java.io.File;
import java.io.IOException;
-import java.time.Instant;
-import java.time.ZonedDateTime;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
@@ -40,8 +38,8 @@ import org.apache.iotdb.db.engine.memtable.MemTableFlushUtil;
import org.apache.iotdb.db.engine.memtable.PrimitiveMemTable;
import org.apache.iotdb.db.engine.pool.FlushManager;
import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk;
-import org.apache.iotdb.db.engine.utils.FlushStatus;
import org.apache.iotdb.db.exception.BufferWriteProcessorException;
+import org.apache.iotdb.db.qp.constant.DatetimeUtils;
import org.apache.iotdb.db.utils.ImmediateFuture;
import org.apache.iotdb.db.utils.MemUtils;
import org.apache.iotdb.db.writelog.manager.MultiFileLogNodeManager;
@@ -61,9 +59,7 @@ public class BufferWriteProcessor extends Processor {
private static final Logger LOGGER =
LoggerFactory.getLogger(BufferWriteProcessor.class);
private RestorableTsFileIOWriter writer;
private FileSchema fileSchema;
- private volatile FlushStatus flushStatus = new FlushStatus();
- private volatile Future<Boolean> flushFuture;
- private volatile boolean isFlush;
+ private volatile Future<Boolean> flushFuture = new ImmediateFuture<>(true);
private ReentrantLock flushQueryLock = new ReentrantLock();
private AtomicLong memSize = new AtomicLong();
private long memThreshold =
TSFileDescriptor.getInstance().getConfig().groupSizeInByte;
@@ -224,7 +220,7 @@ public class BufferWriteProcessor extends Processor {
flushQueryLock.lock();
try {
MemSeriesLazyMerger memSeriesLazyMerger = new MemSeriesLazyMerger();
- if (isFlush) {
+ if (flushMemTable != null) {
memSeriesLazyMerger.addMemSeries(flushMemTable.query(deviceId,
measurementId, dataType));
}
memSeriesLazyMerger.addMemSeries(workMemTable.query(deviceId,
measurementId, dataType));
@@ -244,7 +240,6 @@ public class BufferWriteProcessor extends Processor {
workMemTable = new PrimitiveMemTable();
}
} finally {
- isFlush = true;
flushQueryLock.unlock();
}
}
@@ -256,11 +251,15 @@ public class BufferWriteProcessor extends Processor {
flushMemTable = null;
writer.appendMetadata();
} finally {
- isFlush = false;
flushQueryLock.unlock();
}
}
+ /**
+ * the caller mast guarantee no other concurrent caller entering this
function.
+ * @param displayMessage message that will appear in system log.
+ * @return true if successfully.
+ */
private boolean flushTask(String displayMessage) {
boolean result;
long flushStartTime = System.currentTimeMillis();
@@ -285,66 +284,49 @@ public class BufferWriteProcessor extends Processor {
getProcessorName(), displayMessage, e);
result = false;
} finally {
- synchronized (flushStatus) {
- flushStatus.setUnFlushing();
- switchFlushToWork();
- flushStatus.notifyAll();
- LOGGER.info("The bufferwrite processor {} ends flushing {}.",
getProcessorName(),
+ switchFlushToWork();
+ LOGGER.info("The bufferwrite processor {} ends flushing {}.",
getProcessorName(),
displayMessage);
- }
}
- long flushEndTime = System.currentTimeMillis();
- long flushInterval = flushEndTime - flushStartTime;
- ZonedDateTime startDateTime =
ZonedDateTime.ofInstant(Instant.ofEpochMilli(flushStartTime),
- IoTDBDescriptor.getInstance().getConfig().getZoneID());
- ZonedDateTime endDateTime =
ZonedDateTime.ofInstant(Instant.ofEpochMilli(flushEndTime),
- IoTDBDescriptor.getInstance().getConfig().getZoneID());
- LOGGER.info(
- "The bufferwrite processor {} flush {}, start time is {}, flush end
time is {}, "
- + "flush time consumption is {}ms",
- getProcessorName(), displayMessage, startDateTime, endDateTime,
flushInterval);
+ if (LOGGER.isInfoEnabled()) {
+ long flushEndTime = System.currentTimeMillis();
+ LOGGER.info(
+ "The bufferwrite processor {} flush {}, start time is {}, flush end
time is {}, "
+ + "flush time consumption is {}ms",
+ getProcessorName(), displayMessage,
+ DatetimeUtils.convertMillsecondToZonedDateTime(flushStartTime),
+ DatetimeUtils.convertMillsecondToZonedDateTime(flushEndTime),
+ flushEndTime - flushStartTime);
+ }
return result;
}
+ // keyword synchronized is added in this method, so that only one flush task
can be submitted now.
@Override
- public Future<Boolean> flush() throws IOException {
+ public synchronized Future<Boolean> flush() throws IOException {
// statistic information for flush
if (lastFlushTime > 0) {
- long thisFlushTime = System.currentTimeMillis();
- long flushTimeInterval = thisFlushTime - lastFlushTime;
- ZonedDateTime lastDateTime =
ZonedDateTime.ofInstant(Instant.ofEpochMilli(lastFlushTime),
- IoTDBDescriptor.getInstance().getConfig().getZoneID());
- ZonedDateTime thisDateTime =
ZonedDateTime.ofInstant(Instant.ofEpochMilli(thisFlushTime),
- IoTDBDescriptor.getInstance().getConfig().getZoneID());
- LOGGER.info(
- "The bufferwrite processor {}: last flush time is {}, this flush
time is {}, "
- + "flush time interval is {}s",
- getProcessorName(), lastDateTime, thisDateTime, flushTimeInterval /
1000);
+ if (LOGGER.isInfoEnabled()) {
+ long thisFlushTime = System.currentTimeMillis();
+ LOGGER.info(
+ "The bufferwrite processor {}: last flush time is {}, this flush
time is {}, "
+ + "flush time interval is {}s", getProcessorName(),
+ DatetimeUtils.convertMillsecondToZonedDateTime(lastFlushTime),
+ DatetimeUtils.convertMillsecondToZonedDateTime(thisFlushTime),
+ (thisFlushTime - lastFlushTime) / 1000);
+ }
}
lastFlushTime = System.currentTimeMillis();
// check value count
if (valueCount > 0) {
// waiting for the end of last flush operation.
-// synchronized (flushStatus) {
-// while (flushStatus.isFlushing()) {
-// try {
-// flushStatus.wait();
-// } catch (InterruptedException e) {
-// LOGGER.error(
-// "Encounter an interrupt error when waitting for the
flushing, "
-// + "the bufferwrite processor is {}.",
-// getProcessorName(), e);
-// Thread.currentThread().interrupt();
-// }
-// }
-// }
try {
flushFuture.get();
} catch (InterruptedException | ExecutionException e) {
- e.printStackTrace();
LOGGER.error("Encounter an interrupt error when waitting for the
flushing, "
+ "the bufferwrite processor is {}.",
getProcessorName(), e);
+ Thread.currentThread().interrupt();
}
// update the lastUpdatetime, prepare for flush
try {
@@ -357,15 +339,15 @@ public class BufferWriteProcessor extends Processor {
logNode.notifyStartFlush();
}
valueCount = 0;
- flushStatus.setFlushing();
switchWorkToFlush();
BasicMemController.getInstance().reportFree(this, memSize.get());
memSize.set(0);
// switch
- return FlushManager.getInstance().submit(() ->
flushTask("asynchronously"));
+ flushFuture = FlushManager.getInstance().submit(() ->
flushTask("asynchronously"));
} else{
- return new ImmediateFuture<>(true);
+ flushFuture = new ImmediateFuture<>(true);
}
+ return flushFuture;
}
@Override
@@ -386,16 +368,16 @@ public class BufferWriteProcessor extends Processor {
// flush the changed information for filenode
filenodeFlushAction.act();
// delete the restore for this bufferwrite processor
- long closeEndTime = System.currentTimeMillis();
- long closeInterval = closeEndTime - closeStartTime;
- ZonedDateTime startDateTime =
ZonedDateTime.ofInstant(Instant.ofEpochMilli(closeStartTime),
- IoTDBDescriptor.getInstance().getConfig().getZoneID());
- ZonedDateTime endDateTime =
ZonedDateTime.ofInstant(Instant.ofEpochMilli(closeEndTime),
- IoTDBDescriptor.getInstance().getConfig().getZoneID());
- LOGGER.info(
- "Close bufferwrite processor {}, the file name is {}, start time is
{}, end time is {}, "
- + "time consumption is {}ms",
- getProcessorName(), fileName, startDateTime, endDateTime,
closeInterval);
+ if (LOGGER.isInfoEnabled()) {
+ long closeEndTime = System.currentTimeMillis();
+ LOGGER.info(
+ "Close bufferwrite processor {}, the file name is {}, start time
is {}, end time is {}, "
+ + "time consumption is {}ms",
+ getProcessorName(), fileName,
+ DatetimeUtils.convertMillsecondToZonedDateTime(closeStartTime),
+ DatetimeUtils.convertMillsecondToZonedDateTime(closeEndTime),
+ closeEndTime - closeStartTime);
+ }
} catch (IOException e) {
LOGGER.error("Close the bufferwrite processor error, the bufferwrite is
{}.",
getProcessorName(), e);
@@ -418,9 +400,7 @@ public class BufferWriteProcessor extends Processor {
* @return True if flushing
*/
public boolean isFlush() {
- synchronized (flushStatus) {
- return flushStatus.isFlushing();
- }
+ return !flushFuture.isDone();
}
/**
@@ -503,4 +483,20 @@ public class BufferWriteProcessor extends Processor {
public WriteLogNode getLogNode() {
return logNode;
}
+
+ /**
+ * used for test. We can know when the flush() is called.
+ * @return the last flush() time.
+ */
+ public long getLastFlushTime() {
+ return lastFlushTime;
+ }
+
+ /**
+ * used for test. We can block to wait for finishing flushing.
+ * @return the future of the flush() task.
+ */
+ public Future<Boolean> getFlushFuture() {
+ return flushFuture;
+ }
}
diff --git
a/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/io/OverflowProcessor.java
b/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/io/OverflowProcessor.java
index 07c5161..3e53acd 100644
---
a/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/io/OverflowProcessor.java
+++
b/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/io/OverflowProcessor.java
@@ -27,10 +27,10 @@ import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Objects;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantLock;
-
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBConstant;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
@@ -46,8 +46,8 @@ import
org.apache.iotdb.db.engine.querycontext.MergeSeriesDataSource;
import org.apache.iotdb.db.engine.querycontext.OverflowInsertFile;
import org.apache.iotdb.db.engine.querycontext.OverflowSeriesDataSource;
import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk;
-import org.apache.iotdb.db.engine.utils.FlushStatus;
import org.apache.iotdb.db.exception.OverflowProcessorException;
+import org.apache.iotdb.db.qp.constant.DatetimeUtils;
import org.apache.iotdb.db.utils.ImmediateFuture;
import org.apache.iotdb.db.utils.MemUtils;
import org.apache.iotdb.db.writelog.manager.MultiFileLogNodeManager;
@@ -68,14 +68,13 @@ public class OverflowProcessor extends Processor {
private static final Logger LOGGER =
LoggerFactory.getLogger(OverflowProcessor.class);
private static final IoTDBConfig TsFileDBConf =
IoTDBDescriptor.getInstance().getConfig();
- private static final TSFileConfig TsFileConf =
TSFileDescriptor.getInstance().getConfig();
private OverflowResource workResource;
private OverflowResource mergeResource;
private OverflowSupport workSupport;
private OverflowSupport flushSupport;
- private volatile FlushStatus flushStatus = new FlushStatus();
+ private volatile Future<Boolean> flushFuture = new ImmediateFuture<>(true);
private volatile boolean isMerge;
private int valueCount;
private String parentPath;
@@ -298,15 +297,20 @@ public class OverflowProcessor extends Processor {
TSDataType dataType)
{
MemSeriesLazyMerger memSeriesLazyMerger = new MemSeriesLazyMerger();
- if (flushStatus.isFlushing()) {
+ queryFlushLock.lock();
+ try {
+ if (flushSupport != null) {
+ memSeriesLazyMerger
+ .addMemSeries(
+ flushSupport.queryOverflowInsertInMemory(deviceId,
measurementId, dataType));
+ }
memSeriesLazyMerger
- .addMemSeries(
- flushSupport.queryOverflowInsertInMemory(deviceId,
measurementId, dataType));
+ .addMemSeries(workSupport.queryOverflowInsertInMemory(deviceId,
measurementId,
+ dataType));
+ return new ReadOnlyMemChunk(dataType, memSeriesLazyMerger);
+ } finally {
+ queryFlushLock.unlock();
}
- memSeriesLazyMerger
- .addMemSeries(workSupport.queryOverflowInsertInMemory(deviceId,
measurementId,
- dataType));
- return new ReadOnlyMemChunk(dataType, memSeriesLazyMerger);
}
/**
@@ -372,10 +376,9 @@ public class OverflowProcessor extends Processor {
if (!isMerge) {
return new Pair<>(null, null);
}
- Pair<String, List<ChunkMetaData>> pair = new Pair<>(
+ return new Pair<>(
mergeResource.getInsertFilePath(),
mergeResource.getInsertMetadatas(deviceId, measurementId,
dataType));
- return pair;
}
private void switchWorkToFlush() {
@@ -425,12 +428,10 @@ public class OverflowProcessor extends Processor {
}
public boolean isFlush() {
- synchronized (flushStatus) {
- return flushStatus.isFlushing();
- }
+ return !flushFuture.isDone();
}
- private boolean flushOperation(String displayMessage) {
+ private boolean flushTask(String displayMessage) {
boolean result;
long flushStartTime = System.currentTimeMillis();
try {
@@ -450,29 +451,27 @@ public class OverflowProcessor extends Processor {
getProcessorName(), displayMessage,
Thread.currentThread().getName(), e);
result = false;
} finally {
- synchronized (flushStatus) {
- flushStatus.setUnFlushing();
// switch from flush to work.
switchFlushToWork();
- flushStatus.notifyAll();
- }
}
// log flush time
- LOGGER.info("The overflow processor {} ends flushing {}.",
getProcessorName(), displayMessage);
- long flushEndTime = System.currentTimeMillis();
- long timeInterval = flushEndTime - flushStartTime;
- ZonedDateTime startDateTime =
ZonedDateTime.ofInstant(Instant.ofEpochMilli(flushStartTime),
- IoTDBDescriptor.getInstance().getConfig().getZoneID());
- ZonedDateTime endDateTime =
ZonedDateTime.ofInstant(Instant.ofEpochMilli(flushEndTime),
- IoTDBDescriptor.getInstance().getConfig().getZoneID());
- LOGGER.info(
- "The overflow processor {} flush {}, start time is {}, flush end
time is {}," +
- " time consumption is {}ms",
- getProcessorName(), displayMessage, startDateTime, endDateTime,
timeInterval);
+ if (LOGGER.isInfoEnabled()) {
+ LOGGER
+ .info("The overflow processor {} ends flushing {}.",
getProcessorName(), displayMessage);
+ long flushEndTime = System.currentTimeMillis();
+ LOGGER.info(
+ "The overflow processor {} flush {}, start time is {}, flush end
time is {}," +
+ " time consumption is {}ms",
+ getProcessorName(), displayMessage,
+ DatetimeUtils.convertMillsecondToZonedDateTime(flushStartTime),
+ DatetimeUtils.convertMillsecondToZonedDateTime(flushEndTime),
+ flushEndTime - flushStartTime);
+ }
return result;
}
- private Future<Boolean> flush(boolean synchronization) throws
OverflowProcessorException {
+ @Override
+ public synchronized Future<Boolean> flush() throws IOException {
// statistic information for flush
if (lastFlushTime > 0) {
long thisFLushTime = System.currentTimeMillis();
@@ -489,21 +488,20 @@ public class OverflowProcessor extends Processor {
lastFlushTime = System.currentTimeMillis();
// value count
if (valueCount > 0) {
- synchronized (flushStatus) {
- while (flushStatus.isFlushing()) {
- try {
- flushStatus.wait();
- } catch (InterruptedException e) {
- LOGGER.error("Waiting the flushstate error in flush row group to
store.", e);
- }
- }
+ try {
+ flushFuture.get();
+ } catch (InterruptedException | ExecutionException e) {
+ LOGGER.error("Encounter an interrupt error when waitting for the
flushing, "
+ + "the bufferwrite processor is {}.",
+ getProcessorName(), e);
+ Thread.currentThread().interrupt();
}
try {
// backup newIntervalFile list and emptyIntervalFileNode
overflowFlushAction.act();
} catch (Exception e) {
LOGGER.error("Flush the overflow rowGroup to file faied, when
overflowFlushAction act");
- throw new OverflowProcessorException(e);
+ throw new IOException(e);
}
if (IoTDBDescriptor.getInstance().getConfig().enableWal) {
@@ -518,47 +516,42 @@ public class OverflowProcessor extends Processor {
memSize.set(0);
valueCount = 0;
// switch from work to flush
- flushStatus.setFlushing();
switchWorkToFlush();
- if (synchronization) {
- return new ImmediateFuture<>(flushOperation("synchronously"));
- } else {
- return FlushManager.getInstance().submit( () ->
- flushOperation("asynchronously"));
- }
+ flushFuture = FlushManager.getInstance().submit( () ->
+ flushTask("asynchronously"));
} else {
- return new ImmediateFuture(true);
+ flushFuture = new ImmediateFuture(true);
}
+ return flushFuture;
}
@Override
- public Future<Boolean> flush() throws IOException {
- try {
- return flush(false);
- } catch (OverflowProcessorException e) {
- throw new IOException(e);
- }
- }
-
- @Override
public void close() throws OverflowProcessorException {
LOGGER.info("The overflow processor {} starts close operation.",
getProcessorName());
long closeStartTime = System.currentTimeMillis();
// flush data
- flush(true);
- LOGGER.info("The overflow processor {} ends close operation.",
getProcessorName());
- // log close time
- long closeEndTime = System.currentTimeMillis();
- long timeInterval = closeEndTime - closeStartTime;
- ZonedDateTime startDateTime =
ZonedDateTime.ofInstant(Instant.ofEpochMilli(closeStartTime),
- IoTDBDescriptor.getInstance().getConfig().getZoneID());
- ZonedDateTime endDateTime =
ZonedDateTime.ofInstant(Instant.ofEpochMilli(closeStartTime),
- IoTDBDescriptor.getInstance().getConfig().getZoneID());
- LOGGER.info(
- "The close operation of overflow processor {} starts at {} and
ends at {}."
- + " It comsumes {}ms.",
- getProcessorName(), startDateTime, endDateTime, timeInterval);
+ try {
+ flush().get();
+ } catch (InterruptedException | ExecutionException e) {
+ LOGGER.error("Encounter an interrupt error when waitting for the
flushing, "
+ + "the bufferwrite processor is {}.",
+ getProcessorName(), e);
+ Thread.currentThread().interrupt();
+ } catch (IOException e) {
+ throw new OverflowProcessorException(e);
+ }
+ if (LOGGER.isInfoEnabled()) {
+ LOGGER.info("The overflow processor {} ends close operation.",
getProcessorName());
+ // log close time
+ long closeEndTime = System.currentTimeMillis();
+ LOGGER.info(
+ "The close operation of overflow processor {} starts at {} and ends
at {}."
+ + " It comsumes {}ms.",
+ getProcessorName(),
DatetimeUtils.convertMillsecondToZonedDateTime(closeStartTime),
+ DatetimeUtils.convertMillsecondToZonedDateTime(closeEndTime),
+ closeEndTime - closeStartTime);
+ }
}
public void clear() throws IOException {
@@ -659,7 +652,7 @@ public class OverflowProcessor extends Processor {
Objects.equals(mergeResource, that.mergeResource) &&
Objects.equals(workSupport, that.workSupport) &&
Objects.equals(flushSupport, that.flushSupport) &&
- Objects.equals(flushStatus, that.flushStatus) &&
+ Objects.equals(flushFuture, that.flushFuture) &&
Objects.equals(parentPath, that.parentPath) &&
Objects.equals(dataPahtCount, that.dataPahtCount) &&
Objects.equals(queryFlushLock, that.queryFlushLock) &&
@@ -673,8 +666,24 @@ public class OverflowProcessor extends Processor {
@Override
public int hashCode() {
return Objects.hash(super.hashCode(), workResource, mergeResource,
workSupport,
- flushSupport, flushStatus, isMerge, valueCount, parentPath,
lastFlushTime,
+ flushSupport, flushFuture, isMerge, valueCount, parentPath,
lastFlushTime,
dataPahtCount, queryFlushLock, overflowFlushAction,
filenodeFlushAction, fileSchema,
memThreshold, memSize, logNode);
}
+
+ /**
+ * used for test. We can block to wait for finishing flushing.
+ * @return the future of the flush() task.
+ */
+ public Future<Boolean> getFlushFuture() {
+ return flushFuture;
+ }
+
+ /**
+ * used for test. We can know when the flush() is called.
+ * @return the last flush() time.
+ */
+ public long getLastFlushTime() {
+ return lastFlushTime;
+ }
}
\ No newline at end of file
diff --git
a/iotdb/src/main/java/org/apache/iotdb/db/engine/utils/FlushStatus.java
b/iotdb/src/main/java/org/apache/iotdb/db/engine/utils/FlushStatus.java
deleted file mode 100644
index f437c96..0000000
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/utils/FlushStatus.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.db.engine.utils;
-
-/**
- * This class is used to represent the state of flush. It's can be used in the
bufferwrite
- * flush{@code SequenceFileManager} and overflow flush{@code
OverFlowProcessor}.
- */
-public class FlushStatus {
-
- private boolean isFlushing;
-
- public FlushStatus() {
- this.isFlushing = false;
- }
-
- public boolean isFlushing() {
- return isFlushing;
- }
-
- public void setFlushing() {
- this.isFlushing = true;
- }
-
- public void setUnFlushing() {
- this.isFlushing = false;
- }
-
-}
diff --git
a/iotdb/src/main/java/org/apache/iotdb/db/qp/constant/DatetimeUtils.java
b/iotdb/src/main/java/org/apache/iotdb/db/qp/constant/DatetimeUtils.java
index 5011071..266c439 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/qp/constant/DatetimeUtils.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/qp/constant/DatetimeUtils.java
@@ -27,6 +27,7 @@ import java.time.format.DateTimeFormatter;
import java.time.format.DateTimeFormatterBuilder;
import java.time.format.SignStyle;
import java.time.temporal.ChronoField;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.exception.qp.LogicalOperatorException;
public class DatetimeUtils {
@@ -206,4 +207,9 @@ public class DatetimeUtils {
public static ZoneOffset toZoneOffset(ZoneId zoneId) {
return zoneId.getRules().getOffset(Instant.now());
}
+
+ public static ZonedDateTime convertMillsecondToZonedDateTime(long
millisecond) {
+ return ZonedDateTime.ofInstant(Instant.ofEpochMilli(millisecond),
+ IoTDBDescriptor.getInstance().getConfig().getZoneID());
+ }
}
diff --git
a/iotdb/src/test/java/org/apache/iotdb/db/engine/bufferwrite/BufferWriteProcessorNewTest.java
b/iotdb/src/test/java/org/apache/iotdb/db/engine/bufferwrite/BufferWriteProcessorNewTest.java
index 0efe087..04046b8 100644
---
a/iotdb/src/test/java/org/apache/iotdb/db/engine/bufferwrite/BufferWriteProcessorNewTest.java
+++
b/iotdb/src/test/java/org/apache/iotdb/db/engine/bufferwrite/BufferWriteProcessorNewTest.java
@@ -26,7 +26,9 @@ import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import org.apache.iotdb.db.conf.directories.Directories;
import org.apache.iotdb.db.engine.MetadataManagerHelper;
import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk;
@@ -39,6 +41,7 @@ import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.utils.Pair;
import org.junit.After;
+import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@@ -123,13 +126,18 @@ public class BufferWriteProcessorNewTest {
assertEquals(num, timeValuePair.getValue().getInt());
}
assertEquals(false, bufferwrite.isFlush());
+ long lastFlushTime = bufferwrite.getLastFlushTime();
// flush asynchronously
bufferwrite.flush();
- assertEquals(true, bufferwrite.isFlush());
+ assertEquals(true, bufferwrite.getLastFlushTime() != lastFlushTime);
assertEquals(true, bufferwrite.canBeClosed());
// waiting for the end of flush.
- while (bufferwrite.isFlush()) {
- TimeUnit.SECONDS.sleep(1);
+ try {
+ bufferwrite.getFlushFuture().get(10, TimeUnit.SECONDS);
+ } catch (Exception e) {
+ //because UT uses a mock flush operation, 10 seconds should be enough.
+ Assert.fail("mock flush spends more than 10 seconds... "
+ + "Please modify the value or change a better test environment");
}
pair = bufferwrite.queryBufferWriteData(processorName, measurementId,
dataType);
left = pair.left;
diff --git
a/iotdb/src/test/java/org/apache/iotdb/db/engine/bufferwrite/BufferWriteProcessorTest.java
b/iotdb/src/test/java/org/apache/iotdb/db/engine/bufferwrite/BufferWriteProcessorTest.java
index 95e0d55..6a2e474 100644
---
a/iotdb/src/test/java/org/apache/iotdb/db/engine/bufferwrite/BufferWriteProcessorTest.java
+++
b/iotdb/src/test/java/org/apache/iotdb/db/engine/bufferwrite/BufferWriteProcessorTest.java
@@ -20,6 +20,7 @@ package org.apache.iotdb.db.engine.bufferwrite;
import static org.junit.Assert.assertEquals;
+import ch.qos.logback.core.util.TimeUtil;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
@@ -27,7 +28,9 @@ import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import org.apache.iotdb.db.conf.directories.Directories;
import org.apache.iotdb.db.engine.MetadataManagerHelper;
import org.apache.iotdb.db.engine.PathUtils;
@@ -194,14 +197,23 @@ public class BufferWriteProcessorTest {
assertEquals(0, bufferwrite.memoryUsage());
assertEquals(TsFileIOWriter.magicStringBytes.length,
bufferwrite.getFileSize());
assertEquals(0, bufferwrite.getMetaSize());
+ long lastFlushTime = bufferwrite.getLastFlushTime();
for (int i = 1; i <= 85; i++) {
bufferwrite.write(deviceId, measurementId, i, dataType,
String.valueOf(i));
assertEquals(i * 12, bufferwrite.memoryUsage());
}
+ assertEquals(lastFlushTime, bufferwrite.getLastFlushTime());
bufferwrite.write(deviceId, measurementId, 86, dataType,
String.valueOf(86));
- assertEquals(true, bufferwrite.isFlush());
+ //assert a flush() is called.
+ assertEquals(false, bufferwrite.getLastFlushTime()==lastFlushTime);
// sleep to the end of flush
- TimeUnit.SECONDS.sleep(2);
+ try {
+ bufferwrite.getFlushFuture().get(10, TimeUnit.SECONDS);
+ } catch (Exception e) {
+ //because UT uses a mock flush operation, 10 seconds should be enough.
+ Assert.fail("mock flush spends more than 10 seconds... "
+ + "Please modify the value or change a better test environment");
+ }
assertEquals(false, bufferwrite.isFlush());
assertEquals(0, bufferwrite.memoryUsage());
// query result
diff --git
a/iotdb/src/test/java/org/apache/iotdb/db/engine/overflow/io/OverflowProcessorTest.java
b/iotdb/src/test/java/org/apache/iotdb/db/engine/overflow/io/OverflowProcessorTest.java
index 43469e2..bb2bddb 100644
---
a/iotdb/src/test/java/org/apache/iotdb/db/engine/overflow/io/OverflowProcessorTest.java
+++
b/iotdb/src/test/java/org/apache/iotdb/db/engine/overflow/io/OverflowProcessorTest.java
@@ -89,7 +89,6 @@ public class OverflowProcessorTest {
assertEquals(0,
overflowSeriesDataSource.getOverflowInsertFileList().get(0).getChunkMetaDataList().size());
processor.flush();
- assertEquals(false, processor.isFlush());
assertEquals(false, processor.isMerge());
// write insert data
OverflowTestUtils.produceInsertData(processor);