kbendick commented on a change in pull request #3293:
URL: https://github.com/apache/iceberg/pull/3293#discussion_r756456476



##########
File path: parquet/src/main/java/org/apache/iceberg/parquet/ParquetWriter.java
##########
@@ -139,9 +150,9 @@ public Metrics metrics() {
   public long length() {
     try {
       if (closed) {
-        return writer.getPos();
+        return writer().getPos();

Review comment:
       Testing suggestion: It seems that possibly another flag might be needed 
of whether or not the writer has been instantiated at all and then throw an 
exception if `getPos` is called on a not open writer or in other situations. 
That’s likely what’s needed when debugging the tests (and other places that 
`writer()` has been swapped for `writer`). And similarly with the calls for the 
footer.
   
   I think that between Flink tasks and operators, somehow across checkpoints 
it’s not preserving the instantiated writer (and underlying buffer), which 
would have been the previous behavior.
   
   Or for testing possibly forcing flushing based on some calls that are now 
silently re-instantiating the writer via just calling `writer()` where 
previously there was a hard created writer (and it’s used to determine if the 
file should write).
   
   Particularly the mixing of `writer()` with the `writeStore` is somewhat 
suspicious to me in the context of Flink’s task and operator isolated state and 
how it gets serialized across the cluster.
   
   But I think you need to ensure the data in the parquet writer gets buffered 
for Flink.

##########
File path: parquet/src/main/java/org/apache/iceberg/parquet/ParquetWriter.java
##########
@@ -139,9 +150,9 @@ public Metrics metrics() {
   public long length() {
     try {
       if (closed) {
-        return writer.getPos();
+        return writer().getPos();
       } else {
-        return writer.getPos() + (writeStore.isColumnFlushNeeded() ? 
writeStore.getBufferedSize() : 0);
+        return writer().getPos() + (writeStore.isColumnFlushNeeded() ? 
writeStore.getBufferedSize() : 0);

Review comment:
       This line is what I suspect is causing issues in Flink in particular.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to