tsreaper commented on code in PR #351:
URL: https://github.com/apache/flink-table-store/pull/351#discussion_r1015169082
##########
flink-table-store-core/src/test/java/org/apache/flink/table/store/table/FileStoreTableTestBase.java:
##########
@@ -296,6 +299,52 @@ public void testWriteWithoutCompaction() throws Exception {
}
}
+ @Test
+ public void testReadCompactedSnapshot() throws Exception {
+ writeCompactData();
+ FileStoreTable table =
+ createFileStoreTable(conf ->
conf.set(CoreOptions.READ_COMPACTED, true));
+
+ DataTableScan.DataFilePlan plan = table.newScan().plan();
+ Snapshot compactedSnapshot =
table.snapshotManager().snapshot(plan.snapshotId);
+ Iterator<Snapshot> snapshotIterator =
table.snapshotManager().snapshots();
+ while (snapshotIterator.hasNext()) {
+ Snapshot snapshot = snapshotIterator.next();
+ if (snapshot.commitKind() == Snapshot.CommitKind.COMPACT) {
+
assertThat(snapshot.id()).isLessThanOrEqualTo(compactedSnapshot.id());
+ }
+ }
+
+
assertThat(compactedSnapshot.commitKind()).isEqualTo(Snapshot.CommitKind.COMPACT);
+ List<Split> splits = plan.splits();
+ TableRead read = table.newRead();
+ assertThat(getResult(read, splits, binaryRow(1), 0,
BATCH_ROW_TO_STRING).size())
+ .isGreaterThan(0);
+ assertThat(getResult(read, splits, binaryRow(2), 0,
BATCH_ROW_TO_STRING).size())
+ .isGreaterThan(0);
Review Comment:
The latest snapshot is a compacted snapshot, how do you make sure your
implementation works (instead of just reading the latest snapshot)? Change this
test.
##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AbstractFileStoreScan.java:
##########
@@ -75,7 +76,8 @@ public AbstractFileStoreScan(
ManifestList.Factory manifestListFactory,
int numOfBuckets,
boolean checkNumOfBuckets,
- CoreOptions.ChangelogProducer changelogProducer) {
+ CoreOptions.ChangelogProducer changelogProducer,
+ boolean readCompacted) {
Review Comment:
I prefer adding a `withReadCompacted(boolean)` method to the scan interface,
just like `withIncrement` and other methods. We don't need to pass the
parameter all along the way.
##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AbstractFileStoreScan.java:
##########
@@ -87,6 +89,7 @@ public AbstractFileStoreScan(
this.numOfBuckets = numOfBuckets;
this.checkNumOfBuckets = checkNumOfBuckets;
this.changelogProducer = changelogProducer;
+ this.readCompacted = readCompacted;
Review Comment:
What happens if user set changelog producer to `INPUT`, and at the same time
use `readCompacted`? It seems that we'll scan nothing, which is unfriendly to
the user.
##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/SnapshotManager.java:
##########
@@ -90,6 +90,25 @@ public boolean snapshotExists(long snapshotId) {
}
}
+ public @Nullable Long latestCompactedSnapshotId() {
+ try {
+ Iterator<Snapshot> iterator = snapshots();
+ Long maxCompactedSnapshotId = null;
+ while (iterator.hasNext()) {
+ Snapshot snapshot = iterator.next();
+ if (snapshot.commitKind() == Snapshot.CommitKind.COMPACT) {
+ if (maxCompactedSnapshotId == null || snapshot.id() >
maxCompactedSnapshotId) {
+ maxCompactedSnapshotId = snapshot.id();
+ }
+ }
+ }
+
+ return maxCompactedSnapshotId;
+ } catch (IOException e) {
+ throw new RuntimeException("Failed to find latest compacted
snapshot id", e);
+ }
Review Comment:
This implementation is very inefficient with thousands of snapshots. Try
iterating backwards from the latest snapshot. When you spot a COMPACT snapshot
you can return directly.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]