This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 0e0ca31a3 [format] Supports estimation of data size in memory for
better control of file size (#1350)
0e0ca31a3 is described below
commit 0e0ca31a3f438480e9f3db70bb07c187f3af74de
Author: liming.1018 <[email protected]>
AuthorDate: Tue Jun 13 22:15:33 2023 +0800
[format] Supports estimation of data size in memory for better control of
file size (#1350)
---
.../org/apache/paimon/format/FormatWriter.java | 11 ++++++
.../main/java/org/apache/paimon/io/FileWriter.java | 8 ----
.../org/apache/paimon/io/RollingFileWriter.java | 17 +-------
.../org/apache/paimon/io/SingleFileWriter.java | 9 +----
.../apache/paimon/format/FlushingFileFormat.java | 6 +++
.../apache/paimon/io/RollingFileWriterTest.java | 23 ++++++-----
.../apache/paimon/format/avro/AvroFileFormat.java | 9 +++++
.../apache/paimon/format/orc/OrcWriterFactory.java | 2 +-
.../paimon/format/orc/writer/OrcBulkWriter.java | 45 +++++++++++++++++++++-
.../format/parquet/writer/ParquetBulkWriter.java | 5 +++
10 files changed, 94 insertions(+), 41 deletions(-)
diff --git
a/paimon-common/src/main/java/org/apache/paimon/format/FormatWriter.java
b/paimon-common/src/main/java/org/apache/paimon/format/FormatWriter.java
index 3a9ccba38..51e1caeac 100644
--- a/paimon-common/src/main/java/org/apache/paimon/format/FormatWriter.java
+++ b/paimon-common/src/main/java/org/apache/paimon/format/FormatWriter.java
@@ -60,4 +60,15 @@ public interface FormatWriter {
* @throws IOException Thrown if the finalization fails.
*/
void finish() throws IOException;
+
+ /**
+ * Check if the writer has reached the <code>targetSize</code>.
+ *
+ * @param suggestedCheck Whether it needs to be checked, but subclasses
can also decide whether
+ * to check it themselves.
+ * @param targetSize The size of the target.
+ * @return true if the target size was reached, otherwise false.
+ * @throws IOException Thrown if calculating the length fails.
+ */
+ boolean reachTargetSize(boolean suggestedCheck, long targetSize) throws
IOException;
}
diff --git a/paimon-core/src/main/java/org/apache/paimon/io/FileWriter.java
b/paimon-core/src/main/java/org/apache/paimon/io/FileWriter.java
index 94d67b67d..182379ed9 100644
--- a/paimon-core/src/main/java/org/apache/paimon/io/FileWriter.java
+++ b/paimon-core/src/main/java/org/apache/paimon/io/FileWriter.java
@@ -100,14 +100,6 @@ public interface FileWriter<T, R> extends Closeable {
*/
long recordCount();
- /**
- * The estimated length of the current writer.
- *
- * @return the estimated length.
- * @throws IOException if encounter any IO error.
- */
- long length() throws IOException;
-
/**
* Abort to clear orphan file(s) if encounter any error.
*
diff --git
a/paimon-core/src/main/java/org/apache/paimon/io/RollingFileWriter.java
b/paimon-core/src/main/java/org/apache/paimon/io/RollingFileWriter.java
index 13fc5c5dc..8bc8e41f8 100644
--- a/paimon-core/src/main/java/org/apache/paimon/io/RollingFileWriter.java
+++ b/paimon-core/src/main/java/org/apache/paimon/io/RollingFileWriter.java
@@ -48,7 +48,6 @@ public class RollingFileWriter<T, R> implements FileWriter<T,
List<R>> {
private final List<R> results;
private SingleFileWriter<T, R> currentWriter = null;
- private long lengthOfClosedFiles = 0L;
private long recordCount = 0;
private boolean closed = false;
@@ -67,9 +66,8 @@ public class RollingFileWriter<T, R> implements FileWriter<T,
List<R>> {
@VisibleForTesting
boolean rollingFile() throws IOException {
- // query writer's length per 1000 records
- return recordCount % CHECK_ROLLING_RECORD_CNT == 0
- && currentWriter.length() >= targetFileSize;
+ return currentWriter.reachTargetSize(
+ recordCount % CHECK_ROLLING_RECORD_CNT == 0, targetFileSize);
}
@Override
@@ -107,7 +105,6 @@ public class RollingFileWriter<T, R> implements
FileWriter<T, List<R>> {
return;
}
- lengthOfClosedFiles += currentWriter.length();
currentWriter.close();
results.add(currentWriter.result());
currentWriter = null;
@@ -118,16 +115,6 @@ public class RollingFileWriter<T, R> implements
FileWriter<T, List<R>> {
return recordCount;
}
- @Override
- public long length() throws IOException {
- long length = lengthOfClosedFiles;
- if (currentWriter != null) {
- length += currentWriter.length();
- }
-
- return length;
- }
-
@Override
public void abort() {
for (FileWriter<T, R> writer : openedWriters) {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/io/SingleFileWriter.java
b/paimon-core/src/main/java/org/apache/paimon/io/SingleFileWriter.java
index da7dd8681..57cdc920e 100644
--- a/paimon-core/src/main/java/org/apache/paimon/io/SingleFileWriter.java
+++ b/paimon-core/src/main/java/org/apache/paimon/io/SingleFileWriter.java
@@ -114,13 +114,8 @@ public abstract class SingleFileWriter<T, R> implements
FileWriter<T, R> {
return recordCount;
}
- @Override
- public long length() throws IOException {
- if (closed) {
- return length;
- } else {
- return out.getPos();
- }
+ public boolean reachTargetSize(boolean suggestedCheck, long targetSize)
throws IOException {
+ return writer.reachTargetSize(suggestedCheck, targetSize);
}
@Override
diff --git
a/paimon-core/src/test/java/org/apache/paimon/format/FlushingFileFormat.java
b/paimon-core/src/test/java/org/apache/paimon/format/FlushingFileFormat.java
index d2f1829cf..48b4170b0 100644
--- a/paimon-core/src/test/java/org/apache/paimon/format/FlushingFileFormat.java
+++ b/paimon-core/src/test/java/org/apache/paimon/format/FlushingFileFormat.java
@@ -69,6 +69,12 @@ public class FlushingFileFormat extends FileFormat {
public void finish() throws IOException {
wrapped.finish();
}
+
+ @Override
+ public boolean reachTargetSize(boolean suggestedCheck, long
targetSize)
+ throws IOException {
+ return wrapped.reachTargetSize(suggestedCheck, targetSize);
+ }
};
};
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/io/RollingFileWriterTest.java
b/paimon-core/src/test/java/org/apache/paimon/io/RollingFileWriterTest.java
index f19f036d2..df183a535 100644
--- a/paimon-core/src/test/java/org/apache/paimon/io/RollingFileWriterTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/io/RollingFileWriterTest.java
@@ -30,13 +30,15 @@ import org.apache.paimon.types.IntType;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.LongCounter;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
import org.junit.jupiter.api.io.TempDir;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
import java.io.File;
import java.io.IOException;
+import static org.apache.paimon.CoreOptions.FileFormatType;
import static org.assertj.core.api.Assertions.assertThat;
/** Test for {@link RollingFileWriter}. */
@@ -55,9 +57,8 @@ public class RollingFileWriterTest {
private RollingFileWriter<InternalRow, DataFileMeta> rollingFileWriter;
- @BeforeEach
- public void beforeEach() {
- FileFormat fileFormat = FileFormat.fromIdentifier("avro", new
Options());
+ public void initialize(String identifier) {
+ FileFormat fileFormat = FileFormat.fromIdentifier(identifier, new
Options());
rollingFileWriter =
new RollingFileWriter<>(
() ->
@@ -80,13 +81,17 @@ public class RollingFileWriterTest {
TARGET_FILE_SIZE);
}
- @Test
- public void testRolling() throws IOException {
+ @ParameterizedTest
+ @EnumSource(FileFormatType.class)
+ public void testRolling(FileFormatType formatType) throws IOException {
+ initialize(formatType.toString());
+ int checkInterval =
+ formatType == FileFormatType.ORC ?
VectorizedRowBatch.DEFAULT_SIZE : 1000;
for (int i = 0; i < 3000; i++) {
rollingFileWriter.write(GenericRow.of(i));
- if (i < 1000) {
+ if (i < checkInterval) {
assertFileNum(1);
- } else if (i < 2000) {
+ } else if (i < checkInterval * 2) {
assertFileNum(2);
} else {
assertFileNum(3);
diff --git
a/paimon-format/src/main/java/org/apache/paimon/format/avro/AvroFileFormat.java
b/paimon-format/src/main/java/org/apache/paimon/format/avro/AvroFileFormat.java
index faf8a794b..11d4d3713 100644
---
a/paimon-format/src/main/java/org/apache/paimon/format/avro/AvroFileFormat.java
+++
b/paimon-format/src/main/java/org/apache/paimon/format/avro/AvroFileFormat.java
@@ -167,6 +167,15 @@ public class AvroFileFormat extends FileFormat {
public void finish() throws IOException {
writer.finish();
}
+
+ @Override
+ public boolean reachTargetSize(boolean suggestedCheck, long
targetSize)
+ throws IOException {
+ if (out != null) {
+ return suggestedCheck && out.getPos() >= targetSize;
+ }
+ throw new IOException("Failed to get stream length: no
open stream");
+ }
};
}
}
diff --git
a/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcWriterFactory.java
b/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcWriterFactory.java
index 7299d48e9..299f55e76 100644
---
a/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcWriterFactory.java
+++
b/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcWriterFactory.java
@@ -110,7 +110,7 @@ public class OrcWriterFactory implements
FormatWriterFactory {
// to the give output stream directly. However, the path would be used
as
// the key of writer in the ORC memory manager, thus we need to make
it unique.
Path unusedPath = new Path(UUID.randomUUID().toString());
- return new OrcBulkWriter(vectorizer, new WriterImpl(null, unusedPath,
opts));
+ return new OrcBulkWriter(vectorizer, new WriterImpl(null, unusedPath,
opts), out);
}
@VisibleForTesting
diff --git
a/paimon-format/src/main/java/org/apache/paimon/format/orc/writer/OrcBulkWriter.java
b/paimon-format/src/main/java/org/apache/paimon/format/orc/writer/OrcBulkWriter.java
index 70064592f..f55db58b9 100644
---
a/paimon-format/src/main/java/org/apache/paimon/format/orc/writer/OrcBulkWriter.java
+++
b/paimon-format/src/main/java/org/apache/paimon/format/orc/writer/OrcBulkWriter.java
@@ -20,11 +20,16 @@ package org.apache.paimon.format.orc.writer;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.format.FormatWriter;
+import org.apache.paimon.fs.PositionOutputStream;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
import org.apache.orc.Writer;
+import org.apache.orc.impl.writer.TreeWriter;
import java.io.IOException;
+import java.lang.reflect.Field;
+import java.security.AccessController;
+import java.security.PrivilegedAction;
import static org.apache.paimon.utils.Preconditions.checkNotNull;
@@ -34,8 +39,13 @@ public class OrcBulkWriter implements FormatWriter {
private final Writer writer;
private final Vectorizer<InternalRow> vectorizer;
private final VectorizedRowBatch rowBatch;
+ private final PositionOutputStream underlyingStream;
+ private final TreeWriter treeWriter;
- public OrcBulkWriter(Vectorizer<InternalRow> vectorizer, Writer writer) {
+ public OrcBulkWriter(
+ Vectorizer<InternalRow> vectorizer,
+ Writer writer,
+ PositionOutputStream underlyingStream) {
this.vectorizer = checkNotNull(vectorizer);
this.writer = checkNotNull(writer);
this.rowBatch = vectorizer.getSchema().createRowBatch();
@@ -43,6 +53,9 @@ public class OrcBulkWriter implements FormatWriter {
// Configure the vectorizer with the writer so that users can add
// metadata on the fly through the Vectorizer#vectorize(...) method.
this.vectorizer.setWriter(this.writer);
+ this.underlyingStream = underlyingStream;
+ // TODO: Turn to access these hidden field directly after upgrade to
ORC 1.7.4
+ this.treeWriter = getHiddenFiledInORC("treeWriter");
}
@Override
@@ -67,4 +80,34 @@ public class OrcBulkWriter implements FormatWriter {
flush();
writer.close();
}
+
+ @Override
+ public boolean reachTargetSize(boolean suggestedCheck, long targetSize)
throws IOException {
+ return rowBatch.size == 0 && length() >= targetSize;
+ }
+
+ private long length() throws IOException {
+ long estimateMemory = treeWriter.estimateMemory();
+ long fileLength = underlyingStream.getPos();
+
+ // This value is estimated, not actual.
+ return (long) Math.ceil(fileLength + estimateMemory * 0.2);
+ }
+
+ @SuppressWarnings("unchecked")
+ private <T> T getHiddenFiledInORC(String fieldName) {
+ try {
+ Field treeWriterField =
writer.getClass().getDeclaredField(fieldName);
+ AccessController.doPrivileged(
+ (PrivilegedAction<Void>)
+ () -> {
+ treeWriterField.setAccessible(true);
+ return null;
+ });
+ return (T) treeWriterField.get(writer);
+ } catch (Exception e) {
+ throw new RuntimeException(
+ "Cannot get " + fieldName + " from " +
writer.getClass().getName(), e);
+ }
+ }
}
diff --git
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/writer/ParquetBulkWriter.java
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/writer/ParquetBulkWriter.java
index b60384269..c621ff13a 100644
---
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/writer/ParquetBulkWriter.java
+++
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/writer/ParquetBulkWriter.java
@@ -56,4 +56,9 @@ public class ParquetBulkWriter implements FormatWriter {
public void finish() throws IOException {
parquetWriter.close();
}
+
+ @Override
+ public boolean reachTargetSize(boolean suggestedCheck, long targetSize)
throws IOException {
+ return suggestedCheck && parquetWriter.getDataSize() >= targetSize;
+ }
}