JingsongLi commented on a change in pull request #30:
URL: https://github.com/apache/flink-table-store/pull/30#discussion_r817506286
##########
File path:
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreExpireImpl.java
##########
@@ -129,38 +124,64 @@ private void expireUntil(long exclusiveId) {
// assume that all snapshots preceding it have been removed
break;
}
+ } catch (IOException e) {
+ throw new RuntimeException(
+ "Failed to determine if snapshot #" + id + " still
exists", e);
+ }
- Snapshot toExpire =
Snapshot.fromPath(pathFactory.toSnapshotPath(id));
+ Snapshot toExpire =
Snapshot.fromPath(pathFactory.toSnapshotPath(id));
+ List<ManifestFileMeta> previousChanges =
manifestList.read(toExpire.previousChanges());
+ List<ManifestFileMeta> newChanges =
manifestList.read(toExpire.newChanges());
- for (ManifestEntry entry :
scan.withSnapshot(toExpire.id()).plan().files()) {
+ // we cannot delete an sst file directly when we meet a DELETE
entry, because that
+ // file might be upgraded
+ Set<Path> sstToDelete = new HashSet<>();
+ for (ManifestFileMeta meta : newChanges) {
+ for (ManifestEntry entry : manifestFile.read(meta.fileName()))
{
SstPathFactory sstPathFactory =
sstPathFactoryCache.getSstPathFactory(
entry.partition(), entry.bucket());
Path sstPath =
sstPathFactory.toPath(entry.file().fileName());
- if (!sstInUse.contains(sstPath)) {
- sstToDelete.add(sstPath);
+ switch (entry.kind()) {
+ case ADD:
+ sstToDelete.remove(sstPath);
+ break;
+ case DELETE:
+ sstToDelete.add(sstPath);
+ break;
+ default:
+ throw new UnsupportedOperationException(
+ "Unknown value kind " +
entry.kind().name());
}
}
+ }
+ for (Path sst : sstToDelete) {
+ FileUtils.deleteOrWarn(sst);
+ }
- for (ManifestFileMeta manifest :
manifestList.read(toExpire.manifestList())) {
- if (!manifestsInUse.contains(manifest)) {
- manifestsToDelete.add(manifest.fileName());
- }
+ // collect useless manifests
+ for (ManifestFileMeta manifest : previousChanges) {
+ if (!manifestsInUse.contains(manifest)) {
+ manifestsToDelete.add(manifest);
+ }
+ }
+ for (ManifestFileMeta manifest : newChanges) {
+ if (!manifestsInUse.contains(manifest)) {
+ manifestsToDelete.add(manifest);
}
-
- manifestList.delete(toExpire.manifestList());
- FileUtils.deleteOrWarn(pathFactory.toSnapshotPath(id));
- } catch (IOException e) {
- throw new RuntimeException(
- "Failed to determine if snapshot #" + id + " still
exists", e);
}
- }
- for (Path sst : sstToDelete) {
- FileUtils.deleteOrWarn(sst);
+ // delete manifest lists
+ manifestList.delete(toExpire.previousChanges());
+ manifestList.delete(toExpire.newChanges());
+
+ // delete snapshot
+ FileUtils.deleteOrWarn(pathFactory.toSnapshotPath(id));
}
- for (String manifestName : manifestsToDelete) {
-
FileUtils.deleteOrWarn(pathFactory.toManifestFilePath(manifestName));
+
+ // actual delete of manifest files
+ for (ManifestFileMeta manifest : manifestsToDelete) {
Review comment:
delete manifests before deleting manifest list?
This ensures that the final files are deleted.
##########
File path:
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreExpireImpl.java
##########
@@ -129,38 +124,64 @@ private void expireUntil(long exclusiveId) {
// assume that all snapshots preceding it have been removed
break;
}
+ } catch (IOException e) {
+ throw new RuntimeException(
+ "Failed to determine if snapshot #" + id + " still
exists", e);
+ }
- Snapshot toExpire =
Snapshot.fromPath(pathFactory.toSnapshotPath(id));
+ Snapshot toExpire =
Snapshot.fromPath(pathFactory.toSnapshotPath(id));
+ List<ManifestFileMeta> previousChanges =
manifestList.read(toExpire.previousChanges());
+ List<ManifestFileMeta> newChanges =
manifestList.read(toExpire.newChanges());
- for (ManifestEntry entry :
scan.withSnapshot(toExpire.id()).plan().files()) {
+ // we cannot delete an sst file directly when we meet a DELETE
entry, because that
+ // file might be upgraded
+ Set<Path> sstToDelete = new HashSet<>();
+ for (ManifestFileMeta meta : newChanges) {
+ for (ManifestEntry entry : manifestFile.read(meta.fileName()))
{
SstPathFactory sstPathFactory =
sstPathFactoryCache.getSstPathFactory(
entry.partition(), entry.bucket());
Path sstPath =
sstPathFactory.toPath(entry.file().fileName());
- if (!sstInUse.contains(sstPath)) {
- sstToDelete.add(sstPath);
+ switch (entry.kind()) {
+ case ADD:
+ sstToDelete.remove(sstPath);
+ break;
+ case DELETE:
+ sstToDelete.add(sstPath);
+ break;
+ default:
+ throw new UnsupportedOperationException(
+ "Unknown value kind " +
entry.kind().name());
}
}
+ }
+ for (Path sst : sstToDelete) {
+ FileUtils.deleteOrWarn(sst);
+ }
- for (ManifestFileMeta manifest :
manifestList.read(toExpire.manifestList())) {
- if (!manifestsInUse.contains(manifest)) {
- manifestsToDelete.add(manifest.fileName());
- }
+ // collect useless manifests
+ for (ManifestFileMeta manifest : previousChanges) {
+ if (!manifestsInUse.contains(manifest)) {
+ manifestsToDelete.add(manifest);
+ }
+ }
+ for (ManifestFileMeta manifest : newChanges) {
+ if (!manifestsInUse.contains(manifest)) {
+ manifestsToDelete.add(manifest);
}
-
- manifestList.delete(toExpire.manifestList());
- FileUtils.deleteOrWarn(pathFactory.toSnapshotPath(id));
- } catch (IOException e) {
- throw new RuntimeException(
- "Failed to determine if snapshot #" + id + " still
exists", e);
}
- }
- for (Path sst : sstToDelete) {
- FileUtils.deleteOrWarn(sst);
+ // delete manifest lists
+ manifestList.delete(toExpire.previousChanges());
+ manifestList.delete(toExpire.newChanges());
+
+ // delete snapshot
+ FileUtils.deleteOrWarn(pathFactory.toSnapshotPath(id));
}
- for (String manifestName : manifestsToDelete) {
-
FileUtils.deleteOrWarn(pathFactory.toManifestFilePath(manifestName));
+
+ // actual delete of manifest files
+ for (ManifestFileMeta manifest : manifestsToDelete) {
Review comment:
delete manifests before deleting manifest list?
This ensures that the final all files are deleted.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]