This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch snapshot/1.1.0-241216
in repository https://gitbox.apache.org/repos/asf/tsfile.git
The following commit(s) were added to refs/heads/snapshot/1.1.0-241216 by this
push:
new 16ea31a0 [To dev/1.1] Add FlushChunkMetadataListener (#328) (#336)
16ea31a0 is described below
commit 16ea31a09a717c837749e2262fa596f7ed36d56f
Author: Zikun Ma <[email protected]>
AuthorDate: Mon Dec 16 18:10:19 2024 +0800
[To dev/1.1] Add FlushChunkMetadataListener (#328) (#336)
* Add FlushChunkMetadataListener
* Fix and add test
(cherry picked from commit a90cdbf0da3b001b720797b0dd8a9bcaa4f970fc)
---
.../write/writer/FlushChunkMetadataListener.java | 33 +++++++++++++++++
.../apache/tsfile/write/writer/TsFileIOWriter.java | 43 ++++++++++++++++++++--
.../writer/TsFileIOWriterMemoryControlTest.java | 43 ++++++++++++++++++++++
3 files changed, 115 insertions(+), 4 deletions(-)
diff --git
a/java/tsfile/src/main/java/org/apache/tsfile/write/writer/FlushChunkMetadataListener.java
b/java/tsfile/src/main/java/org/apache/tsfile/write/writer/FlushChunkMetadataListener.java
new file mode 100644
index 00000000..237cb191
--- /dev/null
+++
b/java/tsfile/src/main/java/org/apache/tsfile/write/writer/FlushChunkMetadataListener.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tsfile.write.writer;
+
+import org.apache.tsfile.file.metadata.IChunkMetadata;
+import org.apache.tsfile.file.metadata.IDeviceID;
+import org.apache.tsfile.utils.Pair;
+
+import java.util.List;
+
+@FunctionalInterface
+public interface FlushChunkMetadataListener {
+
+ // Pair<device id, measurement id> -> chunk metadata list
+ void onFlush(List<Pair<Pair<IDeviceID, String>, List<IChunkMetadata>>>
sortedChunkMetadataList);
+}
diff --git
a/java/tsfile/src/main/java/org/apache/tsfile/write/writer/TsFileIOWriter.java
b/java/tsfile/src/main/java/org/apache/tsfile/write/writer/TsFileIOWriter.java
index ef9515c5..b7d71972 100644
---
a/java/tsfile/src/main/java/org/apache/tsfile/write/writer/TsFileIOWriter.java
+++
b/java/tsfile/src/main/java/org/apache/tsfile/write/writer/TsFileIOWriter.java
@@ -116,6 +116,8 @@ public class TsFileIOWriter implements AutoCloseable {
private volatile int chunkMetadataCount = 0;
public static final String CHUNK_METADATA_TEMP_FILE_SUFFIX = ".meta";
+ private final List<FlushChunkMetadataListener> flushListeners = new
ArrayList<>();
+
/** empty construct function. */
protected TsFileIOWriter() {}
@@ -156,6 +158,10 @@ public class TsFileIOWriter implements AutoCloseable {
chunkMetadataTempFile = new File(file.getAbsolutePath() +
CHUNK_METADATA_TEMP_FILE_SUFFIX);
}
+ public void addFlushListener(FlushChunkMetadataListener listener) {
+ flushListeners.add(listener);
+ }
+
/**
* Writes given bytes to output stream. This method is called when total
memory size exceeds the
* chunk group size threshold.
@@ -580,6 +586,19 @@ public class TsFileIOWriter implements AutoCloseable {
this.maxPlanIndex = maxPlanIndex;
}
+ public long getMaxMetadataSize() {
+ return maxMetadataSize;
+ }
+
+ /**
+ * Set the max memory size of chunk metadata. Note that the new size may be
larger than current
+ * chunk metadata size, so caller would better call {@link
#checkMetadataSizeAndMayFlush()} after
+ * this to avoid violating memory control.
+ */
+ public void setMaxMetadataSize(long maxMetadataSize) {
+ this.maxMetadataSize = maxMetadataSize;
+ }
+
/**
* Check if the size of chunk metadata in memory is greater than the given
threshold. If so, the
* chunk metadata will be written to a temp files. <b>Notice! If you are
writing a aligned device
@@ -619,25 +638,41 @@ public class TsFileIOWriter implements AutoCloseable {
protected int sortAndFlushChunkMetadata() throws IOException {
int writtenSize = 0;
// group by series
- List<Pair<Path, List<IChunkMetadata>>> sortedChunkMetadataList =
+ final List<Pair<Path, List<IChunkMetadata>>> sortedChunkMetadataList =
TSMIterator.sortChunkMetadata(
chunkGroupMetadataList, currentChunkGroupDeviceId,
chunkMetadataList);
if (tempOutput == null) {
tempOutput = new LocalTsFileOutput(new
FileOutputStream(chunkMetadataTempFile));
}
hasChunkMetadataInDisk = true;
+
+ // This list is the same as sortedChunkMetadataList, but Path is replaced
by Pair<IDeviceID,
+ // String>
+ final List<Pair<Pair<IDeviceID, String>, List<IChunkMetadata>>>
+ sortedChunkMetadataListForCallBack = new ArrayList<>();
+
for (Pair<Path, List<IChunkMetadata>> pair : sortedChunkMetadataList) {
- Path seriesPath = pair.left;
- boolean isNewPath = !seriesPath.equals(lastSerializePath);
+ final Path seriesPath = pair.left;
+ final boolean isNewPath = !seriesPath.equals(lastSerializePath);
if (isNewPath) {
// record the count of path to construct bloom filter later
pathCount++;
}
- List<IChunkMetadata> iChunkMetadataList = pair.right;
+ final List<IChunkMetadata> iChunkMetadataList = pair.right;
writtenSize += writeChunkMetadataToTempFile(iChunkMetadataList,
seriesPath, isNewPath);
lastSerializePath = seriesPath;
+ sortedChunkMetadataListForCallBack.add(
+ new Pair<>(
+ new Pair<>(seriesPath.getIDeviceID(),
seriesPath.getMeasurement()),
+ iChunkMetadataList));
logger.debug("Flushing {}", seriesPath);
}
+
+ // notify the listeners
+ for (final FlushChunkMetadataListener listener : flushListeners) {
+ listener.onFlush(sortedChunkMetadataListForCallBack);
+ }
+
// clear the cache metadata to release the memory
chunkGroupMetadataList.clear();
if (chunkMetadataList != null) {
diff --git
a/java/tsfile/src/test/java/org/apache/tsfile/write/writer/TsFileIOWriterMemoryControlTest.java
b/java/tsfile/src/test/java/org/apache/tsfile/write/writer/TsFileIOWriterMemoryControlTest.java
index 4b161ff8..ae8d07af 100644
---
a/java/tsfile/src/test/java/org/apache/tsfile/write/writer/TsFileIOWriterMemoryControlTest.java
+++
b/java/tsfile/src/test/java/org/apache/tsfile/write/writer/TsFileIOWriterMemoryControlTest.java
@@ -57,6 +57,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
+import java.util.concurrent.atomic.AtomicInteger;
public class TsFileIOWriterMemoryControlTest {
private static File testFile = new File("target", "1-1-0-0.tsfile");
@@ -249,6 +250,48 @@ public class TsFileIOWriterMemoryControlTest {
}
}
+ /** The following test is for calling listeners after flushing chunk
metadata. */
+ @Test
+ public void testFlushChunkMetadataListener() throws IOException {
+ try (TsFileIOWriter writer = new TsFileIOWriter(testFile, 1024 * 1024 *
10)) {
+ final AtomicInteger cnt1 = new AtomicInteger(0);
+ final AtomicInteger cnt2 = new AtomicInteger(0);
+ writer.addFlushListener(sortedChunkMetadataList ->
cnt1.incrementAndGet());
+ writer.addFlushListener(sortedChunkMetadataList ->
cnt2.incrementAndGet());
+ List<ChunkMetadata> originChunkMetadataList = new ArrayList<>();
+ for (int i = 0; i < 10; ++i) {
+ IDeviceID deviceId = sortedDeviceId.get(i);
+ writer.startChunkGroup(deviceId);
+ generateIntData(0, 0L, new ArrayList<>()).writeToFileWriter(writer);
+ generateBooleanData(1, 0L, new
ArrayList<>()).writeToFileWriter(writer);
+ generateFloatData(2, 0L, new ArrayList<>()).writeToFileWriter(writer);
+ generateDoubleData(3, 0L, new ArrayList<>()).writeToFileWriter(writer);
+ generateTextData(4, 0L, new ArrayList<>()).writeToFileWriter(writer);
+ originChunkMetadataList.addAll(writer.chunkMetadataList);
+ writer.endChunkGroup();
+ }
+ writer.sortAndFlushChunkMetadata();
+ writer.tempOutput.flush();
+
+ TSMIterator iterator =
+ TSMIterator.getTSMIteratorInDisk(
+ writer.chunkMetadataTempFile,
+ writer.chunkGroupMetadataList,
+ writer.endPosInCMTForDevice);
+ for (int i = 0; iterator.hasNext(); ++i) {
+ Pair<Path, TimeseriesMetadata> timeseriesMetadataPair =
iterator.next();
+ TimeseriesMetadata timeseriesMetadata = timeseriesMetadataPair.right;
+ Assert.assertEquals(sortedSeriesId.get(i % 5),
timeseriesMetadata.getMeasurementId());
+ Assert.assertEquals(
+ originChunkMetadataList.get(i).getDataType(),
timeseriesMetadata.getTsDataType());
+ Assert.assertEquals(
+ originChunkMetadataList.get(i).getStatistics(),
timeseriesMetadata.getStatistics());
+ }
+ Assert.assertEquals(1, cnt1.get());
+ Assert.assertEquals(1, cnt2.get());
+ }
+ }
+
/** The following tests is for writing normal series in different nums. */
/**