rdblue commented on a change in pull request #1477:
URL: https://github.com/apache/iceberg/pull/1477#discussion_r496321160



##########
File path: 
flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java
##########
@@ -178,14 +185,20 @@ public void notifyCheckpointComplete(long checkpointId) 
throws Exception {
     }
   }
 
-  private void commitUpToCheckpoint(NavigableMap<Long, List<DataFile>> 
dataFilesMap,
+  private void commitUpToCheckpoint(NavigableMap<Long, Byte[]> manifestsMap,
                                     String newFlinkJobId,
-                                    long checkpointId) {
-    NavigableMap<Long, List<DataFile>> pendingFileMap = 
dataFilesMap.headMap(checkpointId, true);
+                                    long checkpointId) throws IOException {
+    NavigableMap<Long, Byte[]> pendingManifestMap = 
manifestsMap.headMap(checkpointId, true);
+
+    List<ManifestFile> manifestFiles = Lists.newArrayList();
+    for (Byte[] manifestData : pendingManifestMap.values()) {
+      ManifestFile manifestFile = 
ManifestFiles.decode(ArrayUtils.toPrimitive(manifestData));
+      manifestFiles.add(manifestFile);
+    }
 
     List<DataFile> pendingDataFiles = Lists.newArrayList();
-    for (List<DataFile> dataFiles : pendingFileMap.values()) {
-      pendingDataFiles.addAll(dataFiles);
+    for (ManifestFile manifestFile : manifestFiles) {
+      pendingDataFiles.addAll(FlinkManifest.read(manifestFile, table.io()));

Review comment:
       I think it is okay to do this, but the append operation can append whole 
manifest files and either copy the manifest contents into an appropriate path 
for you, or take ownership of the manifest and manage it so it doesn't need to 
be rewritten. You might consider using `AppendFiles.appendManifest` instead.
   
   If you do choose to append manifests, then we would want to know whether the 
manifests are written into the table's metadata space (path came from 
`TableOperations.metadataFilePath`) or whether the manifest was written to a 
temp location, like @stevenzwu's suggestion. In the latter case, we would want 
to request that Iceberg rewrite the manifests in the commit operation.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to