JingsongLi commented on code in PR #460:
URL: https://github.com/apache/flink-table-store/pull/460#discussion_r1063181702
##########
flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/FullChangelogStoreSinkWrite.java:
##########
@@ -166,67 +191,83 @@ public List<Committable> prepareCommit(boolean
doCompaction, long checkpointId)
currentFirstWriteMs = null;
}
- if (snapshotIdentifierToCheck == null // wait for last forced full
compaction to complete
- && !writtenBuckets.isEmpty() // there should be something to
compact
+ if (LOG.isDebugEnabled()) {
+ for (Map.Entry<Long, Set<Tuple2<BinaryRowData, Integer>>>
checkpointIdAndBuckets :
+ writtenBuckets.entrySet()) {
+ LOG.debug(
+ "Written buckets for checkpoint #{} are:",
checkpointIdAndBuckets.getKey());
+ for (Tuple2<BinaryRowData, Integer> bucket :
checkpointIdAndBuckets.getValue()) {
+ LOG.debug(" * partition {}, bucket {}", bucket.f0,
bucket.f1);
+ }
+ }
+ }
+
+ if (!writtenBuckets.isEmpty() // there should be something to compact
&& System.currentTimeMillis() -
firstWriteMs.firstEntry().getValue()
>= fullCompactionThresholdMs // time without full
compaction exceeds
) {
doCompaction = true;
}
if (doCompaction) {
- snapshotIdentifierToCheck = checkpointId;
- Set<Tuple2<BinaryRowData, Integer>> compactedBuckets = new
HashSet<>();
- for (Set<Tuple2<BinaryRowData, Integer>> buckets :
writtenBuckets.values()) {
- for (Tuple2<BinaryRowData, Integer> bucket : buckets) {
- if (compactedBuckets.contains(bucket)) {
- continue;
- }
- compactedBuckets.add(bucket);
- try {
- write.compact(bucket.f0, bucket.f1, true);
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Submit full compaction for checkpoint #{}",
checkpointId);
}
+ submitFullCompaction();
+ commitIdentifiersToCheck.add(checkpointId);
}
return super.prepareCommit(doCompaction, checkpointId);
}
- private Optional<Snapshot> findSnapshot(long identifierToCheck) {
- // TODO We need a mechanism to do timeout recovery in case of snapshot
expiration.
+ private void checkSuccessfulFullCompaction() {
SnapshotManager snapshotManager = table.snapshotManager();
Long latestId = snapshotManager.latestSnapshotId();
if (latestId == null) {
- return Optional.empty();
+ return;
}
-
Long earliestId = snapshotManager.earliestSnapshotId();
if (earliestId == null) {
- return Optional.empty();
+ return;
}
- // We must find the snapshot whose identifier is exactly
`identifierToCheck`.
- // We can't just compare with the latest snapshot identifier by this
user (like what we do
- // in `AbstractFileStoreWrite`), because even if the latest snapshot
identifier is newer,
- // compact changes may still be discarded due to commit conflicts.
for (long id = latestId; id >= earliestId; id--) {
Snapshot snapshot = snapshotManager.snapshot(id);
- if (!snapshot.commitUser().equals(commitUser)) {
- continue;
- }
- if (snapshot.commitIdentifier() == identifierToCheck) {
- return Optional.of(snapshot);
- } else if (snapshot.commitIdentifier() < identifierToCheck) {
- // We're searching from new snapshots to old ones. So if we
find an older
- // identifier, we can make sure our target snapshot will never
occur.
- return Optional.empty();
+ if (snapshot.commitUser().equals(commitUser)
+ && snapshot.commitKind() == Snapshot.CommitKind.COMPACT) {
+ long commitIdentifier = snapshot.commitIdentifier();
+ if (commitIdentifiersToCheck.contains(commitIdentifier)) {
+ // we found a full compaction snapshot
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(
+ "Found full compaction snapshot #{} with
identifier {}",
+ id,
+ commitIdentifier);
+ }
+ writtenBuckets.headMap(commitIdentifier, true).clear();
+ firstWriteMs.headMap(commitIdentifier, true).clear();
+ commitIdentifiersToCheck.headSet(commitIdentifier).clear();
+ break;
+ }
}
}
+ }
- return Optional.empty();
+ private void submitFullCompaction() {
+ Set<Tuple2<BinaryRowData, Integer>> compactedBuckets = new HashSet<>();
+ for (Set<Tuple2<BinaryRowData, Integer>> buckets :
writtenBuckets.values()) {
Review Comment:
Minor, map iteration using `forEach`:
```
writtenBuckets.forEach(
(cp, buckets) -> {
for (Tuple2<BinaryRowData, Integer> bucket : buckets) {
writtenBucketList.add(Tuple3.of(cp, bucket.f0, bucket.f1));
}
});
```
##########
flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/FullChangelogStoreSinkWrite.java:
##########
@@ -166,67 +191,83 @@ public List<Committable> prepareCommit(boolean
doCompaction, long checkpointId)
currentFirstWriteMs = null;
}
- if (snapshotIdentifierToCheck == null // wait for last forced full
compaction to complete
- && !writtenBuckets.isEmpty() // there should be something to
compact
+ if (LOG.isDebugEnabled()) {
+ for (Map.Entry<Long, Set<Tuple2<BinaryRowData, Integer>>>
checkpointIdAndBuckets :
+ writtenBuckets.entrySet()) {
+ LOG.debug(
+ "Written buckets for checkpoint #{} are:",
checkpointIdAndBuckets.getKey());
+ for (Tuple2<BinaryRowData, Integer> bucket :
checkpointIdAndBuckets.getValue()) {
+ LOG.debug(" * partition {}, bucket {}", bucket.f0,
bucket.f1);
+ }
+ }
+ }
+
+ if (!writtenBuckets.isEmpty() // there should be something to compact
&& System.currentTimeMillis() -
firstWriteMs.firstEntry().getValue()
>= fullCompactionThresholdMs // time without full
compaction exceeds
) {
doCompaction = true;
}
if (doCompaction) {
- snapshotIdentifierToCheck = checkpointId;
- Set<Tuple2<BinaryRowData, Integer>> compactedBuckets = new
HashSet<>();
- for (Set<Tuple2<BinaryRowData, Integer>> buckets :
writtenBuckets.values()) {
- for (Tuple2<BinaryRowData, Integer> bucket : buckets) {
- if (compactedBuckets.contains(bucket)) {
- continue;
- }
- compactedBuckets.add(bucket);
- try {
- write.compact(bucket.f0, bucket.f1, true);
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Submit full compaction for checkpoint #{}",
checkpointId);
}
+ submitFullCompaction();
+ commitIdentifiersToCheck.add(checkpointId);
}
return super.prepareCommit(doCompaction, checkpointId);
}
- private Optional<Snapshot> findSnapshot(long identifierToCheck) {
- // TODO We need a mechanism to do timeout recovery in case of snapshot
expiration.
+ private void checkSuccessfulFullCompaction() {
SnapshotManager snapshotManager = table.snapshotManager();
Long latestId = snapshotManager.latestSnapshotId();
if (latestId == null) {
- return Optional.empty();
+ return;
}
-
Long earliestId = snapshotManager.earliestSnapshotId();
if (earliestId == null) {
- return Optional.empty();
+ return;
}
- // We must find the snapshot whose identifier is exactly
`identifierToCheck`.
- // We can't just compare with the latest snapshot identifier by this
user (like what we do
- // in `AbstractFileStoreWrite`), because even if the latest snapshot
identifier is newer,
- // compact changes may still be discarded due to commit conflicts.
for (long id = latestId; id >= earliestId; id--) {
Snapshot snapshot = snapshotManager.snapshot(id);
- if (!snapshot.commitUser().equals(commitUser)) {
- continue;
- }
- if (snapshot.commitIdentifier() == identifierToCheck) {
- return Optional.of(snapshot);
- } else if (snapshot.commitIdentifier() < identifierToCheck) {
- // We're searching from new snapshots to old ones. So if we
find an older
- // identifier, we can make sure our target snapshot will never
occur.
- return Optional.empty();
+ if (snapshot.commitUser().equals(commitUser)
+ && snapshot.commitKind() == Snapshot.CommitKind.COMPACT) {
+ long commitIdentifier = snapshot.commitIdentifier();
+ if (commitIdentifiersToCheck.contains(commitIdentifier)) {
+ // we found a full compaction snapshot
Review Comment:
Add more comments to explain why we need to find a full compaction snapshot
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]