tsreaper commented on code in PR #315:
URL: https://github.com/apache/flink-table-store/pull/315#discussion_r998904675
##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MergeTreeWriter.java:
##########
@@ -122,48 +139,61 @@ public void write(KeyValue kv) throws Exception {
@Override
public long memoryOccupancy() {
- return memTable.memoryOccupancy();
+ return writeBuffer.memoryOccupancy();
}
@Override
public void flushMemory() throws Exception {
- if (memTable.size() > 0) {
+ boolean success = writeBuffer.flushMemory();
+ if (!success) {
+ flushWriteBuffer();
+ }
+ }
+
+ private void flushWriteBuffer() throws Exception {
+ if (writeBuffer.size() > 0) {
if (compactManager.shouldWaitCompaction()) {
// stop writing, wait for compaction finished
trySyncLatestCompaction(true);
}
- // write changelog file
- //
- // NOTE: We must first call memTable.rawIterator(), then call
memTable.mergeIterator().
- // Otherwise memTable.rawIterator() will generate sorted, but not
yet merged, data. See
- // comments on MemTable for more details.
- if (changelogProducer == ChangelogProducer.INPUT) {
- KeyValueDataFileWriter changelogWriter =
writerFactory.createChangelogFileWriter(0);
- changelogWriter.write(memTable.rawIterator());
- changelogWriter.close();
- changelogFiles.add(changelogWriter.result());
+ final RollingFileWriter<KeyValue, DataFileMeta> changelogWriter =
+ changelogProducer == ChangelogProducer.INPUT
+ ? writerFactory.createRollingChangelogFileWriter(0)
+ : null;
Review Comment:
Then add comments to `KeyValueDataFileWriter`, stating that records given to
the writer must be sorted because we're not comparing the min max keys.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]