This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new a51c10929 [core] Refactor Multiple Partitions Write for Append table
(#1994)
a51c10929 is described below
commit a51c10929d99cd3d2964c28ce1bcaec450f4a9ae
Author: Jingsong Lee <[email protected]>
AuthorDate: Tue Sep 12 22:43:12 2023 +0800
[core] Refactor Multiple Partitions Write for Append table (#1994)
---
docs/content/concepts/append-only-table.md | 25 +++-----------
.../main/java/org/apache/paimon/CoreOptions.java | 2 +-
.../org/apache/paimon/append/AppendOnlyWriter.java | 39 +++++++++++-----------
.../org/apache/paimon/disk/ExternalBuffer.java | 13 ++++++--
.../org/apache/paimon/disk/InMemoryBuffer.java | 5 +--
.../{InternalRowBuffer.java => RowBuffer.java} | 8 ++---
.../paimon/operation/AppendOnlyFileStoreWrite.java | 2 +-
.../apache/paimon/append/AppendOnlyWriterTest.java | 6 ++--
.../org/apache/paimon/disk/ExternalBufferTest.java | 7 ++--
.../org/apache/paimon/disk/InMemoryBufferTest.java | 4 +--
.../sink/index/GlobalIndexAssignerOperator.java | 9 +++--
11 files changed, 55 insertions(+), 65 deletions(-)
diff --git a/docs/content/concepts/append-only-table.md
b/docs/content/concepts/append-only-table.md
index e8a87b699..b3bfa8168 100644
--- a/docs/content/concepts/append-only-table.md
+++ b/docs/content/concepts/append-only-table.md
@@ -279,27 +279,12 @@ CREATE TABLE MyTable (
{{< /tabs >}}
## Multiple Partitions Write
+
While writing multiple partitions in a single insert job, we may get an
out-of-memory error if
too many records arrived between two checkpoint.
-On 0.6 version, introduced a `write-buffer-for-append` option for append-only
table. Setting this
-parameter to true, we will cache the records use Segment Pool to avoid OOM.
-
-### Influence
-If we also set `write-buffer-spillable` to true, we can spill the records to
disk by serializer.
-But this may cause the checkpoint acknowledge delay and have an influence on
sink speed.
-
-But if we don't set `write-buffer-spillable`, once we run out of memory in
segment poll, we will flush
-them to filesystem as a complete data file. This may cause too many small
files, and make more pressure on
-compaction. We need to trade off carefully.
+You can `write-buffer-for-append` option for append-only table. Setting this
parameter to true, writer will cache
+the records use Segment Pool to avoid OOM.
-### Example
-```sql
-CREATE TABLE MyTable (
- product_id BIGINT,
- price DOUBLE,
- sales BIGINT
-) WITH (
- 'write-buffer-for-append' = 'true'
-);
-```
\ No newline at end of file
+You can also set `write-buffer-spillable` to true, writer can spill the
records to disk. This can reduce small
+files as much as possible.
diff --git a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
index e4f233ef2..84723a2fd 100644
--- a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
@@ -1014,7 +1014,7 @@ public class CoreOptions implements Serializable {
return
options.getOptional(WRITE_BUFFER_SPILLABLE).orElse(usingObjectStore ||
!isStreaming);
}
- public boolean useWriteBuffer() {
+ public boolean useWriteBufferForAppend() {
return options.get(WRITE_BUFFER_FOR_APPEND);
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java
b/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java
index 7b0909568..51ddbbed0 100644
--- a/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java
+++ b/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java
@@ -24,7 +24,7 @@ import org.apache.paimon.compact.CompactManager;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.data.serializer.InternalRowSerializer;
import org.apache.paimon.disk.IOManager;
-import org.apache.paimon.disk.InternalRowBuffer;
+import org.apache.paimon.disk.RowBuffer;
import org.apache.paimon.format.FileFormat;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.io.CompactIncrement;
@@ -70,7 +70,6 @@ public class AppendOnlyWriter implements
RecordWriter<InternalRow>, MemoryOwner
private final List<DataFileMeta> compactAfter;
private final LongCounter seqNumCounter;
private final String fileCompression;
- private final boolean spillable;
private final SinkWriter sinkWriter;
private final FieldStatsCollector.Factory[] statsCollectors;
private final IOManager ioManager;
@@ -104,11 +103,11 @@ public class AppendOnlyWriter implements
RecordWriter<InternalRow>, MemoryOwner
this.compactAfter = new ArrayList<>();
this.seqNumCounter = new LongCounter(maxSequenceNumber + 1);
this.fileCompression = fileCompression;
- this.spillable = spillable;
this.ioManager = ioManager;
this.statsCollectors = statsCollectors;
- sinkWriter = createSinkWrite(useWriteBuffer);
+ this.sinkWriter =
+ useWriteBuffer ? new BufferedSinkWriter(spillable) : new
DirectSinkWriter();
if (increment != null) {
newFiles.addAll(increment.newFilesIncrement().newFiles());
@@ -190,14 +189,6 @@ public class AppendOnlyWriter implements
RecordWriter<InternalRow>, MemoryOwner
sinkWriter.close();
}
- private SinkWriter createSinkWrite(boolean useWriteBuffer) {
- if (useWriteBuffer) {
- return new BufferedSinkWriter();
- } else {
- return new DirectSinkWriter();
- }
- }
-
private RowDataRollingFileWriter createRollingRowWriter() {
return new RowDataRollingFileWriter(
fileIO,
@@ -254,7 +245,7 @@ public class AppendOnlyWriter implements
RecordWriter<InternalRow>, MemoryOwner
}
@VisibleForTesting
- InternalRowBuffer getWriteBuffer() {
+ RowBuffer getWriteBuffer() {
if (sinkWriter instanceof BufferedSinkWriter) {
return ((BufferedSinkWriter) sinkWriter).writeBuffer;
} else {
@@ -268,7 +259,7 @@ public class AppendOnlyWriter implements
RecordWriter<InternalRow>, MemoryOwner
}
/** Internal interface to Sink Data from input. */
- interface SinkWriter {
+ private interface SinkWriter {
boolean write(InternalRow data) throws IOException;
@@ -287,10 +278,13 @@ public class AppendOnlyWriter implements
RecordWriter<InternalRow>, MemoryOwner
*/
private class DirectSinkWriter implements SinkWriter {
- private RowDataRollingFileWriter writer = createRollingRowWriter();
+ private RowDataRollingFileWriter writer;
@Override
public boolean write(InternalRow data) throws IOException {
+ if (writer == null) {
+ writer = createRollingRowWriter();
+ }
writer.write(data);
return true;
}
@@ -301,7 +295,7 @@ public class AppendOnlyWriter implements
RecordWriter<InternalRow>, MemoryOwner
if (writer != null) {
writer.close();
flushedFiles.addAll(writer.result());
- writer = createRollingRowWriter();
+ writer = null;
}
return flushedFiles;
}
@@ -331,7 +325,13 @@ public class AppendOnlyWriter implements
RecordWriter<InternalRow>, MemoryOwner
*/
private class BufferedSinkWriter implements SinkWriter {
- InternalRowBuffer writeBuffer;
+ private final boolean spillable;
+
+ private RowBuffer writeBuffer;
+
+ private BufferedSinkWriter(boolean spillable) {
+ this.spillable = spillable;
+ }
@Override
public boolean write(InternalRow data) throws IOException {
@@ -344,8 +344,7 @@ public class AppendOnlyWriter implements
RecordWriter<InternalRow>, MemoryOwner
if (writeBuffer != null) {
writeBuffer.complete();
RowDataRollingFileWriter writer = createRollingRowWriter();
- try (InternalRowBuffer.InternalRowBufferIterator iterator =
- writeBuffer.newIterator()) {
+ try (RowBuffer.RowBufferIterator iterator =
writeBuffer.newIterator()) {
while (iterator.advanceNext()) {
writer.write(iterator.getRow());
}
@@ -375,7 +374,7 @@ public class AppendOnlyWriter implements
RecordWriter<InternalRow>, MemoryOwner
@Override
public void setMemoryPool(MemorySegmentPool memoryPool) {
writeBuffer =
- InternalRowBuffer.getBuffer(
+ RowBuffer.getBuffer(
ioManager,
memoryPool,
new InternalRowSerializer(writeSchema),
diff --git
a/paimon-core/src/main/java/org/apache/paimon/disk/ExternalBuffer.java
b/paimon-core/src/main/java/org/apache/paimon/disk/ExternalBuffer.java
index 2503bf8e6..702e8914e 100644
--- a/paimon-core/src/main/java/org/apache/paimon/disk/ExternalBuffer.java
+++ b/paimon-core/src/main/java/org/apache/paimon/disk/ExternalBuffer.java
@@ -38,7 +38,7 @@ import java.util.List;
import static org.apache.paimon.utils.Preconditions.checkState;
/** An external buffer for storing rows, it will spill the data to disk when
the memory is full. */
-public class ExternalBuffer implements InternalRowBuffer {
+public class ExternalBuffer implements RowBuffer {
private static final Logger LOG =
LoggerFactory.getLogger(ExternalBuffer.class);
@@ -78,6 +78,7 @@ public class ExternalBuffer implements InternalRowBuffer {
new InMemoryBuffer(pool,
(AbstractRowDataSerializer<InternalRow>) serializer);
}
+ @Override
public void reset() {
clearChannels();
inMemoryBuffer.reset();
@@ -85,6 +86,7 @@ public class ExternalBuffer implements InternalRowBuffer {
addCompleted = false;
}
+ @Override
public boolean put(InternalRow row) throws IOException {
checkState(!addCompleted, "This buffer has add completed.");
if (!inMemoryBuffer.put(row)) {
@@ -102,12 +104,13 @@ public class ExternalBuffer implements InternalRowBuffer {
return true;
}
+ @Override
public void complete() {
addCompleted = true;
}
@Override
- public InternalRowBufferIterator newIterator() {
+ public RowBufferIterator newIterator() {
checkState(addCompleted, "This buffer has not add completed.");
return new BufferIterator();
}
@@ -157,10 +160,12 @@ public class ExternalBuffer implements InternalRowBuffer {
inMemoryBuffer.reset();
}
+ @Override
public int size() {
return numRows;
}
+ @Override
public long memoryOccupancy() {
return inMemoryBuffer.memoryOccupancy();
}
@@ -181,7 +186,7 @@ public class ExternalBuffer implements InternalRowBuffer {
}
/** Iterator of external buffer. */
- public class BufferIterator implements InternalRowBufferIterator {
+ public class BufferIterator implements RowBufferIterator {
private MutableObjectIterator<BinaryRow> currentIterator;
private final BinaryRow reuse = binaryRowSerializer.createInstance();
@@ -216,6 +221,7 @@ public class ExternalBuffer implements InternalRowBuffer {
closed = true;
}
+ @Override
public boolean advanceNext() {
checkValidity();
@@ -246,6 +252,7 @@ public class ExternalBuffer implements InternalRowBuffer {
return true;
}
+ @Override
public BinaryRow getRow() {
return row;
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/disk/InMemoryBuffer.java
b/paimon-core/src/main/java/org/apache/paimon/disk/InMemoryBuffer.java
index 4758bb183..a9eaf55e5 100644
--- a/paimon-core/src/main/java/org/apache/paimon/disk/InMemoryBuffer.java
+++ b/paimon-core/src/main/java/org/apache/paimon/disk/InMemoryBuffer.java
@@ -32,7 +32,8 @@ import java.io.IOException;
import java.util.ArrayList;
/** Only cache {@link InternalRow}s in memory. */
-public class InMemoryBuffer implements InternalRowBuffer {
+public class InMemoryBuffer implements RowBuffer {
+
private final AbstractRowDataSerializer<InternalRow> serializer;
private final ArrayList<MemorySegment> recordBufferSegments;
private final SimpleCollectingOutputView recordCollector;
@@ -126,7 +127,7 @@ public class InMemoryBuffer implements InternalRowBuffer {
}
private static class InMemoryBufferIterator
- implements InternalRowBufferIterator,
MutableObjectIterator<BinaryRow> {
+ implements RowBufferIterator, MutableObjectIterator<BinaryRow> {
private final RandomAccessInputView recordBuffer;
private final AbstractRowDataSerializer<InternalRow> serializer;
diff --git
a/paimon-core/src/main/java/org/apache/paimon/disk/InternalRowBuffer.java
b/paimon-core/src/main/java/org/apache/paimon/disk/RowBuffer.java
similarity index 90%
rename from
paimon-core/src/main/java/org/apache/paimon/disk/InternalRowBuffer.java
rename to paimon-core/src/main/java/org/apache/paimon/disk/RowBuffer.java
index 776db52aa..c5c881331 100644
--- a/paimon-core/src/main/java/org/apache/paimon/disk/InternalRowBuffer.java
+++ b/paimon-core/src/main/java/org/apache/paimon/disk/RowBuffer.java
@@ -27,7 +27,7 @@ import java.io.Closeable;
import java.io.IOException;
/** Cache buffer for {@link InternalRow}. */
-public interface InternalRowBuffer {
+public interface RowBuffer {
boolean put(InternalRow row) throws IOException;
@@ -39,10 +39,10 @@ public interface InternalRowBuffer {
void reset();
- InternalRowBufferIterator newIterator();
+ RowBufferIterator newIterator();
/** Iterator to fetch record from buffer. */
- interface InternalRowBufferIterator extends Closeable {
+ interface RowBufferIterator extends Closeable {
boolean advanceNext();
@@ -51,7 +51,7 @@ public interface InternalRowBuffer {
void close();
}
- static InternalRowBuffer getBuffer(
+ static RowBuffer getBuffer(
IOManager ioManager,
MemorySegmentPool memoryPool,
AbstractRowDataSerializer<InternalRow> serializer,
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java
b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java
index ca1d65cae..dc8b74433 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java
@@ -95,7 +95,7 @@ public class AppendOnlyFileStoreWrite extends
MemoryFileStoreWrite<InternalRow>
this.commitForceCompact = options.commitForceCompact();
this.skipCompaction = options.writeOnly();
this.fileCompression = options.fileCompression();
- this.useWriteBuffer = options.useWriteBuffer();
+ this.useWriteBuffer = options.useWriteBufferForAppend();
this.spillable = options.writeBufferSpillable(fileIO.isObjectStore(),
isStreamingMode);
this.statsCollectors =
StatsCollectorFactories.createStatsFactories(options,
rowType.getFieldNames());
diff --git
a/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java
b/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java
index 5699ea415..62a0292a8 100644
---
a/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java
@@ -27,7 +27,7 @@ import org.apache.paimon.data.InternalRow;
import org.apache.paimon.disk.ChannelWithMeta;
import org.apache.paimon.disk.ExternalBuffer;
import org.apache.paimon.disk.IOManager;
-import org.apache.paimon.disk.InternalRowBuffer;
+import org.apache.paimon.disk.RowBuffer;
import org.apache.paimon.format.FieldStats;
import org.apache.paimon.format.FileFormat;
import org.apache.paimon.fs.Path;
@@ -329,7 +329,7 @@ public class AppendOnlyWriterTest {
writer.write(row(j, String.valueOf(s), PART));
}
- InternalRowBuffer buffer = writer.getWriteBuffer();
+ RowBuffer buffer = writer.getWriteBuffer();
Assertions.assertThat(buffer.size()).isEqualTo(100);
Assertions.assertThat(buffer.memoryOccupancy()).isLessThanOrEqualTo(16384L);
@@ -343,7 +343,7 @@ public class AppendOnlyWriterTest {
// we give it a small Memory Pool, force it to spill
writer.setMemoryPool(new HeapMemorySegmentPool(16384L, 1024));
- InternalRowBuffer buffer = writer.getWriteBuffer();
+ RowBuffer buffer = writer.getWriteBuffer();
Assertions.assertThat(buffer).isNull();
writer.close();
diff --git
a/paimon-core/src/test/java/org/apache/paimon/disk/ExternalBufferTest.java
b/paimon-core/src/test/java/org/apache/paimon/disk/ExternalBufferTest.java
index 6ca73bd6a..53e530e91 100644
--- a/paimon-core/src/test/java/org/apache/paimon/disk/ExternalBufferTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/disk/ExternalBufferTest.java
@@ -168,14 +168,13 @@ public class ExternalBufferTest {
buffer.put(row);
}
- private void assertBuffer(List<Long> expected, InternalRowBuffer buffer) {
- InternalRowBuffer.InternalRowBufferIterator iterator =
buffer.newIterator();
+ private void assertBuffer(List<Long> expected, RowBuffer buffer) {
+ RowBuffer.RowBufferIterator iterator = buffer.newIterator();
assertBuffer(expected, iterator);
iterator.close();
}
- private void assertBuffer(
- List<Long> expected, InternalRowBuffer.InternalRowBufferIterator
iterator) {
+ private void assertBuffer(List<Long> expected, RowBuffer.RowBufferIterator
iterator) {
List<Long> values = new ArrayList<>();
while (iterator.advanceNext()) {
values.add(iterator.getRow().getLong(0));
diff --git
a/paimon-core/src/test/java/org/apache/paimon/disk/InMemoryBufferTest.java
b/paimon-core/src/test/java/org/apache/paimon/disk/InMemoryBufferTest.java
index e1b127714..9bcfdc6c6 100644
--- a/paimon-core/src/test/java/org/apache/paimon/disk/InMemoryBufferTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/disk/InMemoryBufferTest.java
@@ -33,7 +33,7 @@ import java.util.Arrays;
import static org.apache.paimon.memory.MemorySegmentPool.DEFAULT_PAGE_SIZE;
-/** Tests for {@link InternalRowBuffer}. */
+/** Tests for {@link RowBuffer}. */
public class InMemoryBufferTest {
private InternalRowSerializer serializer;
@@ -87,7 +87,7 @@ public class InMemoryBufferTest {
}
Assertions.assertThat(buffer.size()).isEqualTo(100);
- try (InternalRowBuffer.InternalRowBufferIterator iterator =
buffer.newIterator()) {
+ try (RowBuffer.RowBufferIterator iterator = buffer.newIterator()) {
while (iterator.advanceNext()) {
Assertions.assertThat(iterator.getRow()).isEqualTo(binaryRow);
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/GlobalIndexAssignerOperator.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/GlobalIndexAssignerOperator.java
index fd36d4a39..d255cdfbb 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/GlobalIndexAssignerOperator.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/GlobalIndexAssignerOperator.java
@@ -22,7 +22,7 @@ import org.apache.paimon.CoreOptions;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.data.serializer.InternalRowSerializer;
import org.apache.paimon.disk.IOManager;
-import org.apache.paimon.disk.InternalRowBuffer;
+import org.apache.paimon.disk.RowBuffer;
import org.apache.paimon.flink.FlinkRowData;
import org.apache.paimon.flink.FlinkRowWrapper;
import org.apache.paimon.flink.sink.RowDataPartitionKeyExtractor;
@@ -60,7 +60,7 @@ public class GlobalIndexAssignerOperator<T> extends
AbstractStreamOperator<Tuple
private final SerializableFunction<T, InternalRow> toRow;
private final SerializableFunction<InternalRow, T> fromRow;
- private transient InternalRowBuffer bootstrapBuffer;
+ private transient RowBuffer bootstrapBuffer;
public GlobalIndexAssignerOperator(
Table table,
@@ -89,7 +89,7 @@ public class GlobalIndexAssignerOperator<T> extends
AbstractStreamOperator<Tuple
long bufferSize =
options.get(CoreOptions.WRITE_BUFFER_SIZE).getBytes();
long pageSize = options.get(CoreOptions.PAGE_SIZE).getBytes();
bootstrapBuffer =
- InternalRowBuffer.getBuffer(
+ RowBuffer.getBuffer(
IOManager.create(ioManager.getSpillingDirectoriesPaths()),
new HeapMemorySegmentPool(bufferSize, (int) pageSize),
new InternalRowSerializer(table.rowType()),
@@ -130,8 +130,7 @@ public class GlobalIndexAssignerOperator<T> extends
AbstractStreamOperator<Tuple
private void endBootstrap() throws Exception {
if (bootstrapBuffer != null) {
bootstrapBuffer.complete();
- try (InternalRowBuffer.InternalRowBufferIterator iterator =
- bootstrapBuffer.newIterator()) {
+ try (RowBuffer.RowBufferIterator iterator =
bootstrapBuffer.newIterator()) {
while (iterator.advanceNext()) {
assigner.process(fromRow.apply(iterator.getRow()));
}