This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git
The following commit(s) were added to refs/heads/master by this push:
new 537e8cf4 [FLINK-29745] Use independent reader/writer factory for
compaction in MergeTreeTest
537e8cf4 is described below
commit 537e8cf4b3733d93720381c65b30024870ece533
Author: shammon <[email protected]>
AuthorDate: Tue Oct 25 11:40:37 2022 +0800
[FLINK-29745] Use independent reader/writer factory for compaction in
MergeTreeTest
This closes #331
---
.../table/store/file/mergetree/MergeTreeTest.java | 32 +++++++++-------------
1 file changed, 13 insertions(+), 19 deletions(-)
diff --git
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/MergeTreeTest.java
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/MergeTreeTest.java
index 583b83dc..c5fa6a1f 100644
---
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/MergeTreeTest.java
+++
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/MergeTreeTest.java
@@ -86,7 +86,9 @@ public class MergeTreeTest {
private CoreOptions options;
private KeyValueFileReaderFactory readerFactory;
+ private KeyValueFileReaderFactory compactReaderFactory;
private KeyValueFileWriterFactory writerFactory;
+ private KeyValueFileWriterFactory compactWriterFactory;
private RecordWriter<KeyValue> writer;
@BeforeEach
@@ -108,24 +110,16 @@ public class MergeTreeTest {
RowType keyType = new RowType(singletonList(new RowType.RowField("k",
new IntType())));
RowType valueType = new RowType(singletonList(new
RowType.RowField("v", new IntType())));
FileFormat flushingAvro = new FlushingFileFormat("avro");
- readerFactory =
+ KeyValueFileReaderFactory.Builder readerFactoryBuilder =
KeyValueFileReaderFactory.builder(
- new SchemaManager(path),
- 0,
- keyType,
- valueType,
- flushingAvro,
- pathFactory)
- .build(BinaryRowDataUtil.EMPTY_ROW, 0);
- writerFactory =
+ new SchemaManager(path), 0, keyType, valueType,
flushingAvro, pathFactory);
+ readerFactory =
readerFactoryBuilder.build(BinaryRowDataUtil.EMPTY_ROW, 0);
+ compactReaderFactory =
readerFactoryBuilder.build(BinaryRowDataUtil.EMPTY_ROW, 0);
+ KeyValueFileWriterFactory.Builder writerFactoryBuilder =
KeyValueFileWriterFactory.builder(
- 0,
- keyType,
- valueType,
- flushingAvro,
- pathFactory,
- options.targetFileSize())
- .build(BinaryRowDataUtil.EMPTY_ROW, 0);
+ 0, keyType, valueType, flushingAvro, pathFactory,
options.targetFileSize());
+ writerFactory =
writerFactoryBuilder.build(BinaryRowDataUtil.EMPTY_ROW, 0);
+ compactWriterFactory =
writerFactoryBuilder.build(BinaryRowDataUtil.EMPTY_ROW, 0);
writer = createMergeTreeWriter(Collections.emptyList());
}
@@ -290,13 +284,13 @@ public class MergeTreeTest {
CompactRewriter rewriter =
(outputLevel, dropDelete, sections) -> {
RollingFileWriter<KeyValue, DataFileMeta> writer =
-
writerFactory.createRollingMergeTreeFileWriter(outputLevel);
+
compactWriterFactory.createRollingMergeTreeFileWriter(outputLevel);
writer.write(
new RecordReaderIterator<>(
new MergeTreeReader(
sections,
dropDelete,
- readerFactory,
+ compactReaderFactory,
comparator,
new DeduplicateMergeFunction())));
writer.close();
@@ -329,7 +323,7 @@ public class MergeTreeTest {
assertThat(remove).isTrue();
// See MergeTreeWriter.updateCompactResult
if (!newFileNames.contains(file.fileName()) &&
!afterFiles.contains(file.fileName())) {
- writerFactory.deleteFile(file.fileName());
+ compactWriterFactory.deleteFile(file.fileName());
}
}
compactedFiles.addAll(increment.compactIncrement().compactAfter());