tsreaper commented on code in PR #171:
URL: https://github.com/apache/flink-table-store/pull/171#discussion_r905872413
##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/AbstractTableWrite.java:
##########
@@ -38,20 +46,33 @@
*
* @param <T> type of record to write into {@link
org.apache.flink.table.store.file.FileStore}.
*/
-public abstract class AbstractTableWrite<T> implements TableWrite {
+public abstract class AbstractTableWrite<T>
+ implements TableWrite,
MemoryPoolFactory.PreemptRunner<RecordWriter<T>> {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(AbstractTableWrite.class);
private final FileStoreWrite<T> write;
private final SinkRecordConverter recordConverter;
private final Map<BinaryRowData, Map<Integer, RecordWriter<T>>> writers;
private final ExecutorService compactExecutor;
+ private final MemoryPoolFactory<RecordWriter<T>> memoryPoolFactory;
private boolean overwrite = false;
- protected AbstractTableWrite(FileStoreWrite<T> write, SinkRecordConverter
recordConverter) {
+ protected AbstractTableWrite(
+ FileStoreWrite<T> write,
+ SinkRecordConverter recordConverter,
+ FileStoreOptions options) {
this.write = write;
this.recordConverter = recordConverter;
+ MergeTreeOptions mergeTreeOptions = options.mergeTreeOptions();
+ HeapMemorySegmentPool memoryPool =
+ new HeapMemorySegmentPool(
+ mergeTreeOptions.writeBufferSize,
mergeTreeOptions.pageSize);
+ this.memoryPoolFactory = new MemoryPoolFactory<>(memoryPool, this);
Review Comment:
Move these to the construction of `KeyValueFileStoreWrite`.
##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/writer/RecordWriter.java:
##########
@@ -32,9 +33,18 @@
*/
public interface RecordWriter<T> {
+ /** Open the record write. */
+ void open(MemorySegmentPool memoryPool);
Review Comment:
No need to change the base interface. These methods are only useful in
`KeyValueRecordWriter`.
##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MergeTreeOptions.java:
##########
@@ -95,7 +95,7 @@ public class MergeTreeOptions {
public final long writeBufferSize;
- public final long pageSize;
+ public final int pageSize;
Review Comment:
Why changing this?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]