This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 0e0ca31a3 [format] Supports estimation of data size in memory for 
better control of file size (#1350)
0e0ca31a3 is described below

commit 0e0ca31a3f438480e9f3db70bb07c187f3af74de
Author: liming.1018 <[email protected]>
AuthorDate: Tue Jun 13 22:15:33 2023 +0800

    [format] Supports estimation of data size in memory for better control of 
file size (#1350)
---
 .../org/apache/paimon/format/FormatWriter.java     | 11 ++++++
 .../main/java/org/apache/paimon/io/FileWriter.java |  8 ----
 .../org/apache/paimon/io/RollingFileWriter.java    | 17 +-------
 .../org/apache/paimon/io/SingleFileWriter.java     |  9 +----
 .../apache/paimon/format/FlushingFileFormat.java   |  6 +++
 .../apache/paimon/io/RollingFileWriterTest.java    | 23 ++++++-----
 .../apache/paimon/format/avro/AvroFileFormat.java  |  9 +++++
 .../apache/paimon/format/orc/OrcWriterFactory.java |  2 +-
 .../paimon/format/orc/writer/OrcBulkWriter.java    | 45 +++++++++++++++++++++-
 .../format/parquet/writer/ParquetBulkWriter.java   |  5 +++
 10 files changed, 94 insertions(+), 41 deletions(-)

diff --git 
a/paimon-common/src/main/java/org/apache/paimon/format/FormatWriter.java 
b/paimon-common/src/main/java/org/apache/paimon/format/FormatWriter.java
index 3a9ccba38..51e1caeac 100644
--- a/paimon-common/src/main/java/org/apache/paimon/format/FormatWriter.java
+++ b/paimon-common/src/main/java/org/apache/paimon/format/FormatWriter.java
@@ -60,4 +60,15 @@ public interface FormatWriter {
      * @throws IOException Thrown if the finalization fails.
      */
     void finish() throws IOException;
+
+    /**
+     * Check if the writer has reached the <code>targetSize</code>.
+     *
+     * @param suggestedCheck Whether it needs to be checked, but subclasses 
can also decide whether
+     *     to check it themselves.
+     * @param targetSize The size of the target.
+     * @return true if the target size was reached, otherwise false.
+     * @throws IOException Thrown if calculating the length fails.
+     */
+    boolean reachTargetSize(boolean suggestedCheck, long targetSize) throws 
IOException;
 }
diff --git a/paimon-core/src/main/java/org/apache/paimon/io/FileWriter.java 
b/paimon-core/src/main/java/org/apache/paimon/io/FileWriter.java
index 94d67b67d..182379ed9 100644
--- a/paimon-core/src/main/java/org/apache/paimon/io/FileWriter.java
+++ b/paimon-core/src/main/java/org/apache/paimon/io/FileWriter.java
@@ -100,14 +100,6 @@ public interface FileWriter<T, R> extends Closeable {
      */
     long recordCount();
 
-    /**
-     * The estimated length of the current writer.
-     *
-     * @return the estimated length.
-     * @throws IOException if encounter any IO error.
-     */
-    long length() throws IOException;
-
     /**
      * Abort to clear orphan file(s) if encounter any error.
      *
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/io/RollingFileWriter.java 
b/paimon-core/src/main/java/org/apache/paimon/io/RollingFileWriter.java
index 13fc5c5dc..8bc8e41f8 100644
--- a/paimon-core/src/main/java/org/apache/paimon/io/RollingFileWriter.java
+++ b/paimon-core/src/main/java/org/apache/paimon/io/RollingFileWriter.java
@@ -48,7 +48,6 @@ public class RollingFileWriter<T, R> implements FileWriter<T, 
List<R>> {
     private final List<R> results;
 
     private SingleFileWriter<T, R> currentWriter = null;
-    private long lengthOfClosedFiles = 0L;
     private long recordCount = 0;
     private boolean closed = false;
 
@@ -67,9 +66,8 @@ public class RollingFileWriter<T, R> implements FileWriter<T, 
List<R>> {
 
     @VisibleForTesting
     boolean rollingFile() throws IOException {
-        // query writer's length per 1000 records
-        return recordCount % CHECK_ROLLING_RECORD_CNT == 0
-                && currentWriter.length() >= targetFileSize;
+        return currentWriter.reachTargetSize(
+                recordCount % CHECK_ROLLING_RECORD_CNT == 0, targetFileSize);
     }
 
     @Override
@@ -107,7 +105,6 @@ public class RollingFileWriter<T, R> implements 
FileWriter<T, List<R>> {
             return;
         }
 
-        lengthOfClosedFiles += currentWriter.length();
         currentWriter.close();
         results.add(currentWriter.result());
         currentWriter = null;
@@ -118,16 +115,6 @@ public class RollingFileWriter<T, R> implements 
FileWriter<T, List<R>> {
         return recordCount;
     }
 
-    @Override
-    public long length() throws IOException {
-        long length = lengthOfClosedFiles;
-        if (currentWriter != null) {
-            length += currentWriter.length();
-        }
-
-        return length;
-    }
-
     @Override
     public void abort() {
         for (FileWriter<T, R> writer : openedWriters) {
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/io/SingleFileWriter.java 
b/paimon-core/src/main/java/org/apache/paimon/io/SingleFileWriter.java
index da7dd8681..57cdc920e 100644
--- a/paimon-core/src/main/java/org/apache/paimon/io/SingleFileWriter.java
+++ b/paimon-core/src/main/java/org/apache/paimon/io/SingleFileWriter.java
@@ -114,13 +114,8 @@ public abstract class SingleFileWriter<T, R> implements 
FileWriter<T, R> {
         return recordCount;
     }
 
-    @Override
-    public long length() throws IOException {
-        if (closed) {
-            return length;
-        } else {
-            return out.getPos();
-        }
+    public boolean reachTargetSize(boolean suggestedCheck, long targetSize) 
throws IOException {
+        return writer.reachTargetSize(suggestedCheck, targetSize);
     }
 
     @Override
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/format/FlushingFileFormat.java 
b/paimon-core/src/test/java/org/apache/paimon/format/FlushingFileFormat.java
index d2f1829cf..48b4170b0 100644
--- a/paimon-core/src/test/java/org/apache/paimon/format/FlushingFileFormat.java
+++ b/paimon-core/src/test/java/org/apache/paimon/format/FlushingFileFormat.java
@@ -69,6 +69,12 @@ public class FlushingFileFormat extends FileFormat {
                 public void finish() throws IOException {
                     wrapped.finish();
                 }
+
+                @Override
+                public boolean reachTargetSize(boolean suggestedCheck, long 
targetSize)
+                        throws IOException {
+                    return wrapped.reachTargetSize(suggestedCheck, targetSize);
+                }
             };
         };
     }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/io/RollingFileWriterTest.java 
b/paimon-core/src/test/java/org/apache/paimon/io/RollingFileWriterTest.java
index f19f036d2..df183a535 100644
--- a/paimon-core/src/test/java/org/apache/paimon/io/RollingFileWriterTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/io/RollingFileWriterTest.java
@@ -30,13 +30,15 @@ import org.apache.paimon.types.IntType;
 import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.LongCounter;
 
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
 import org.junit.jupiter.api.io.TempDir;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
 
 import java.io.File;
 import java.io.IOException;
 
+import static org.apache.paimon.CoreOptions.FileFormatType;
 import static org.assertj.core.api.Assertions.assertThat;
 
 /** Test for {@link RollingFileWriter}. */
@@ -55,9 +57,8 @@ public class RollingFileWriterTest {
 
     private RollingFileWriter<InternalRow, DataFileMeta> rollingFileWriter;
 
-    @BeforeEach
-    public void beforeEach() {
-        FileFormat fileFormat = FileFormat.fromIdentifier("avro", new 
Options());
+    public void initialize(String identifier) {
+        FileFormat fileFormat = FileFormat.fromIdentifier(identifier, new 
Options());
         rollingFileWriter =
                 new RollingFileWriter<>(
                         () ->
@@ -80,13 +81,17 @@ public class RollingFileWriterTest {
                         TARGET_FILE_SIZE);
     }
 
-    @Test
-    public void testRolling() throws IOException {
+    @ParameterizedTest
+    @EnumSource(FileFormatType.class)
+    public void testRolling(FileFormatType formatType) throws IOException {
+        initialize(formatType.toString());
+        int checkInterval =
+                formatType == FileFormatType.ORC ? 
VectorizedRowBatch.DEFAULT_SIZE : 1000;
         for (int i = 0; i < 3000; i++) {
             rollingFileWriter.write(GenericRow.of(i));
-            if (i < 1000) {
+            if (i < checkInterval) {
                 assertFileNum(1);
-            } else if (i < 2000) {
+            } else if (i < checkInterval * 2) {
                 assertFileNum(2);
             } else {
                 assertFileNum(3);
diff --git 
a/paimon-format/src/main/java/org/apache/paimon/format/avro/AvroFileFormat.java 
b/paimon-format/src/main/java/org/apache/paimon/format/avro/AvroFileFormat.java
index faf8a794b..11d4d3713 100644
--- 
a/paimon-format/src/main/java/org/apache/paimon/format/avro/AvroFileFormat.java
+++ 
b/paimon-format/src/main/java/org/apache/paimon/format/avro/AvroFileFormat.java
@@ -167,6 +167,15 @@ public class AvroFileFormat extends FileFormat {
                 public void finish() throws IOException {
                     writer.finish();
                 }
+
+                @Override
+                public boolean reachTargetSize(boolean suggestedCheck, long 
targetSize)
+                        throws IOException {
+                    if (out != null) {
+                        return suggestedCheck && out.getPos() >= targetSize;
+                    }
+                    throw new IOException("Failed to get stream length: no 
open stream");
+                }
             };
         }
     }
diff --git 
a/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcWriterFactory.java
 
b/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcWriterFactory.java
index 7299d48e9..299f55e76 100644
--- 
a/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcWriterFactory.java
+++ 
b/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcWriterFactory.java
@@ -110,7 +110,7 @@ public class OrcWriterFactory implements 
FormatWriterFactory {
         // to the give output stream directly. However, the path would be used 
as
         // the key of writer in the ORC memory manager, thus we need to make 
it unique.
         Path unusedPath = new Path(UUID.randomUUID().toString());
-        return new OrcBulkWriter(vectorizer, new WriterImpl(null, unusedPath, 
opts));
+        return new OrcBulkWriter(vectorizer, new WriterImpl(null, unusedPath, 
opts), out);
     }
 
     @VisibleForTesting
diff --git 
a/paimon-format/src/main/java/org/apache/paimon/format/orc/writer/OrcBulkWriter.java
 
b/paimon-format/src/main/java/org/apache/paimon/format/orc/writer/OrcBulkWriter.java
index 70064592f..f55db58b9 100644
--- 
a/paimon-format/src/main/java/org/apache/paimon/format/orc/writer/OrcBulkWriter.java
+++ 
b/paimon-format/src/main/java/org/apache/paimon/format/orc/writer/OrcBulkWriter.java
@@ -20,11 +20,16 @@ package org.apache.paimon.format.orc.writer;
 
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.format.FormatWriter;
+import org.apache.paimon.fs.PositionOutputStream;
 
 import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
 import org.apache.orc.Writer;
+import org.apache.orc.impl.writer.TreeWriter;
 
 import java.io.IOException;
+import java.lang.reflect.Field;
+import java.security.AccessController;
+import java.security.PrivilegedAction;
 
 import static org.apache.paimon.utils.Preconditions.checkNotNull;
 
@@ -34,8 +39,13 @@ public class OrcBulkWriter implements FormatWriter {
     private final Writer writer;
     private final Vectorizer<InternalRow> vectorizer;
     private final VectorizedRowBatch rowBatch;
+    private final PositionOutputStream underlyingStream;
+    private final TreeWriter treeWriter;
 
-    public OrcBulkWriter(Vectorizer<InternalRow> vectorizer, Writer writer) {
+    public OrcBulkWriter(
+            Vectorizer<InternalRow> vectorizer,
+            Writer writer,
+            PositionOutputStream underlyingStream) {
         this.vectorizer = checkNotNull(vectorizer);
         this.writer = checkNotNull(writer);
         this.rowBatch = vectorizer.getSchema().createRowBatch();
@@ -43,6 +53,9 @@ public class OrcBulkWriter implements FormatWriter {
         // Configure the vectorizer with the writer so that users can add
         // metadata on the fly through the Vectorizer#vectorize(...) method.
         this.vectorizer.setWriter(this.writer);
+        this.underlyingStream = underlyingStream;
+        // TODO: Turn to access these hidden field directly after upgrade to 
ORC 1.7.4
+        this.treeWriter = getHiddenFiledInORC("treeWriter");
     }
 
     @Override
@@ -67,4 +80,34 @@ public class OrcBulkWriter implements FormatWriter {
         flush();
         writer.close();
     }
+
+    @Override
+    public boolean reachTargetSize(boolean suggestedCheck, long targetSize) 
throws IOException {
+        return rowBatch.size == 0 && length() >= targetSize;
+    }
+
+    private long length() throws IOException {
+        long estimateMemory = treeWriter.estimateMemory();
+        long fileLength = underlyingStream.getPos();
+
+        // This value is estimated, not actual.
+        return (long) Math.ceil(fileLength + estimateMemory * 0.2);
+    }
+
+    @SuppressWarnings("unchecked")
+    private <T> T getHiddenFiledInORC(String fieldName) {
+        try {
+            Field treeWriterField = 
writer.getClass().getDeclaredField(fieldName);
+            AccessController.doPrivileged(
+                    (PrivilegedAction<Void>)
+                            () -> {
+                                treeWriterField.setAccessible(true);
+                                return null;
+                            });
+            return (T) treeWriterField.get(writer);
+        } catch (Exception e) {
+            throw new RuntimeException(
+                    "Cannot get " + fieldName + " from " + 
writer.getClass().getName(), e);
+        }
+    }
 }
diff --git 
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/writer/ParquetBulkWriter.java
 
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/writer/ParquetBulkWriter.java
index b60384269..c621ff13a 100644
--- 
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/writer/ParquetBulkWriter.java
+++ 
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/writer/ParquetBulkWriter.java
@@ -56,4 +56,9 @@ public class ParquetBulkWriter implements FormatWriter {
     public void finish() throws IOException {
         parquetWriter.close();
     }
+
+    @Override
+    public boolean reachTargetSize(boolean suggestedCheck, long targetSize) 
throws IOException {
+        return suggestedCheck && parquetWriter.getDataSize() >= targetSize;
+    }
 }

Reply via email to