This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 6757f5648 [core] Add compression for ExternalBuffer. (#3146)
6757f5648 is described below
commit 6757f56482fd6ba9503c439b2a0e44e30af5e3cd
Author: YeJunHao <[email protected]>
AuthorDate: Wed Apr 3 09:48:17 2024 +0800
[core] Add compression for ExternalBuffer. (#3146)
---
.../apache/paimon/memory/MemorySegmentPool.java | 2 +-
.../org/apache/paimon/append/AppendOnlyWriter.java | 15 +++-
.../paimon/crosspartition/GlobalIndexAssigner.java | 3 +-
.../paimon/disk/BufferFileReaderInputView.java | 99 ----------------------
.../apache/paimon/disk/ChannelReaderInputView.java | 33 ++++++++
.../paimon/disk/ChannelWriterOutputView.java | 8 ++
.../org/apache/paimon/disk/ExternalBuffer.java | 34 +++++---
.../java/org/apache/paimon/disk/RowBuffer.java | 5 +-
.../paimon/operation/AppendOnlyFileStoreWrite.java | 3 +
.../apache/paimon/append/AppendOnlyWriterTest.java | 1 +
.../org/apache/paimon/disk/ExternalBufferTest.java | 7 +-
.../apache/paimon/format/FileFormatSuffixTest.java | 1 +
12 files changed, 90 insertions(+), 121 deletions(-)
diff --git
a/paimon-common/src/main/java/org/apache/paimon/memory/MemorySegmentPool.java
b/paimon-common/src/main/java/org/apache/paimon/memory/MemorySegmentPool.java
index 9ea5fa3f6..18538b2fa 100644
---
a/paimon-common/src/main/java/org/apache/paimon/memory/MemorySegmentPool.java
+++
b/paimon-common/src/main/java/org/apache/paimon/memory/MemorySegmentPool.java
@@ -36,7 +36,7 @@ public interface MemorySegmentPool extends
MemorySegmentSource {
/**
* Get the page size of each page this pool holds.
*
- * @return the page size
+ * @return the page size, the bytes size in one page.
*/
int pageSize();
diff --git
a/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java
b/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java
index 9566dd372..4b00eae1f 100644
--- a/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java
+++ b/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java
@@ -71,6 +71,7 @@ public class AppendOnlyWriter implements
RecordWriter<InternalRow>, MemoryOwner
private final List<DataFileMeta> compactAfter;
private final LongCounter seqNumCounter;
private final String fileCompression;
+ private final String spillCompression;
private SinkWriter sinkWriter;
private final FieldStatsCollector.Factory[] statsCollectors;
private final IOManager ioManager;
@@ -93,6 +94,7 @@ public class AppendOnlyWriter implements
RecordWriter<InternalRow>, MemoryOwner
boolean useWriteBuffer,
boolean spillable,
String fileCompression,
+ String spillCompression,
FieldStatsCollector.Factory[] statsCollectors,
MemorySize maxDiskSize) {
this.fileIO = fileIO;
@@ -109,13 +111,14 @@ public class AppendOnlyWriter implements
RecordWriter<InternalRow>, MemoryOwner
this.compactAfter = new ArrayList<>();
this.seqNumCounter = new LongCounter(maxSequenceNumber + 1);
this.fileCompression = fileCompression;
+ this.spillCompression = spillCompression;
this.ioManager = ioManager;
this.statsCollectors = statsCollectors;
this.maxDiskSize = maxDiskSize;
this.sinkWriter =
useWriteBuffer
- ? new BufferedSinkWriter(spillable, maxDiskSize)
+ ? new BufferedSinkWriter(spillable, maxDiskSize,
spillCompression)
: new DirectSinkWriter();
if (increment != null) {
@@ -211,7 +214,7 @@ public class AppendOnlyWriter implements
RecordWriter<InternalRow>, MemoryOwner
trySyncLatestCompaction(true);
sinkWriter.close();
- sinkWriter = new BufferedSinkWriter(true, maxDiskSize);
+ sinkWriter = new BufferedSinkWriter(true, maxDiskSize,
spillCompression);
sinkWriter.setMemoryPool(memorySegmentPool);
}
}
@@ -378,11 +381,14 @@ public class AppendOnlyWriter implements
RecordWriter<InternalRow>, MemoryOwner
private final MemorySize maxDiskSize;
+ private final String compression;
+
private RowBuffer writeBuffer;
- private BufferedSinkWriter(boolean spillable, MemorySize maxDiskSize) {
+ private BufferedSinkWriter(boolean spillable, MemorySize maxDiskSize,
String compression) {
this.spillable = spillable;
this.maxDiskSize = maxDiskSize;
+ this.compression = compression;
}
@Override
@@ -439,7 +445,8 @@ public class AppendOnlyWriter implements
RecordWriter<InternalRow>, MemoryOwner
memoryPool,
new InternalRowSerializer(writeSchema),
spillable,
- maxDiskSize);
+ maxDiskSize,
+ compression);
}
@Override
diff --git
a/paimon-core/src/main/java/org/apache/paimon/crosspartition/GlobalIndexAssigner.java
b/paimon-core/src/main/java/org/apache/paimon/crosspartition/GlobalIndexAssigner.java
index c53d055d5..819937984 100644
---
a/paimon-core/src/main/java/org/apache/paimon/crosspartition/GlobalIndexAssigner.java
+++
b/paimon-core/src/main/java/org/apache/paimon/crosspartition/GlobalIndexAssigner.java
@@ -171,7 +171,8 @@ public class GlobalIndexAssigner implements Serializable,
Closeable {
coreOptions.writeBufferSize() / 2,
coreOptions.pageSize()),
new InternalRowSerializer(table.rowType()),
true,
- coreOptions.writeBufferSpillDiskSize());
+ coreOptions.writeBufferSpillDiskSize(),
+ coreOptions.spillCompression());
}
public void bootstrapKey(InternalRow value) throws IOException {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/disk/BufferFileReaderInputView.java
b/paimon-core/src/main/java/org/apache/paimon/disk/BufferFileReaderInputView.java
deleted file mode 100644
index ab91b8944..000000000
---
a/paimon-core/src/main/java/org/apache/paimon/disk/BufferFileReaderInputView.java
+++ /dev/null
@@ -1,99 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.disk;
-
-import org.apache.paimon.data.AbstractPagedInputView;
-import org.apache.paimon.data.BinaryRow;
-import org.apache.paimon.data.serializer.BinaryRowSerializer;
-import org.apache.paimon.memory.Buffer;
-import org.apache.paimon.memory.MemorySegment;
-import org.apache.paimon.utils.MutableObjectIterator;
-
-import java.io.EOFException;
-import java.io.IOException;
-
-/** An {@link AbstractPagedInputView} which reads blocks from channel without
compression. */
-public class BufferFileReaderInputView extends AbstractPagedInputView {
-
- private final BufferFileReader reader;
- private final MemorySegment segment;
-
- private int currentSegmentLimit;
-
- public BufferFileReaderInputView(FileIOChannel.ID id, IOManager ioManager,
int segmentSize)
- throws IOException {
- this.reader = ioManager.createBufferFileReader(id);
- this.segment = MemorySegment.wrap(new byte[segmentSize]);
- }
-
- @Override
- protected MemorySegment nextSegment(MemorySegment current) throws
IOException {
- if (reader.hasReachedEndOfFile()) {
- throw new EOFException();
- }
-
- Buffer buffer = Buffer.create(segment);
- reader.readInto(buffer);
- this.currentSegmentLimit = buffer.getSize();
- return segment;
- }
-
- @Override
- protected int getLimitForSegment(MemorySegment segment) {
- return currentSegmentLimit;
- }
-
- public void close() throws IOException {
- reader.close();
- }
-
- public FileIOChannel getChannel() {
- return reader;
- }
-
- public MutableObjectIterator<BinaryRow> createBinaryRowIterator(
- BinaryRowSerializer serializer) {
- return new BinaryRowChannelInputViewIterator(serializer);
- }
-
- private class BinaryRowChannelInputViewIterator implements
MutableObjectIterator<BinaryRow> {
-
- protected final BinaryRowSerializer serializer;
-
- public BinaryRowChannelInputViewIterator(BinaryRowSerializer
serializer) {
- this.serializer = serializer;
- }
-
- @Override
- public BinaryRow next(BinaryRow reuse) throws IOException {
- try {
- return this.serializer.deserializeFromPages(reuse,
BufferFileReaderInputView.this);
- } catch (EOFException e) {
- close();
- return null;
- }
- }
-
- @Override
- public BinaryRow next() throws IOException {
- throw new UnsupportedOperationException(
- "This method is disabled due to performance issue!");
- }
- }
-}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/disk/ChannelReaderInputView.java
b/paimon-core/src/main/java/org/apache/paimon/disk/ChannelReaderInputView.java
index e0ecaa68c..aeb6e08c2 100644
---
a/paimon-core/src/main/java/org/apache/paimon/disk/ChannelReaderInputView.java
+++
b/paimon-core/src/main/java/org/apache/paimon/disk/ChannelReaderInputView.java
@@ -21,9 +21,12 @@ package org.apache.paimon.disk;
import org.apache.paimon.compression.BlockCompressionFactory;
import org.apache.paimon.compression.BlockDecompressor;
import org.apache.paimon.data.AbstractPagedInputView;
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.serializer.BinaryRowSerializer;
import org.apache.paimon.io.DataInputView;
import org.apache.paimon.memory.Buffer;
import org.apache.paimon.memory.MemorySegment;
+import org.apache.paimon.utils.MutableObjectIterator;
import java.io.EOFException;
import java.io.IOException;
@@ -100,4 +103,34 @@ public class ChannelReaderInputView extends
AbstractPagedInputView {
public FileIOChannel getChannel() {
return reader;
}
+
+ public MutableObjectIterator<BinaryRow> createBinaryRowIterator(
+ BinaryRowSerializer serializer) {
+ return new BinaryRowChannelInputViewIterator(serializer);
+ }
+
+ private class BinaryRowChannelInputViewIterator implements
MutableObjectIterator<BinaryRow> {
+
+ protected final BinaryRowSerializer serializer;
+
+ public BinaryRowChannelInputViewIterator(BinaryRowSerializer
serializer) {
+ this.serializer = serializer;
+ }
+
+ @Override
+ public BinaryRow next(BinaryRow reuse) throws IOException {
+ try {
+ return this.serializer.deserializeFromPages(reuse,
ChannelReaderInputView.this);
+ } catch (EOFException e) {
+ close();
+ return null;
+ }
+ }
+
+ @Override
+ public BinaryRow next() throws IOException {
+ throw new UnsupportedOperationException(
+ "This method is disabled due to performance issue!");
+ }
+ }
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/disk/ChannelWriterOutputView.java
b/paimon-core/src/main/java/org/apache/paimon/disk/ChannelWriterOutputView.java
index 98a532cc5..6a1838e9b 100644
---
a/paimon-core/src/main/java/org/apache/paimon/disk/ChannelWriterOutputView.java
+++
b/paimon-core/src/main/java/org/apache/paimon/disk/ChannelWriterOutputView.java
@@ -71,6 +71,14 @@ public final class ChannelWriterOutputView extends
AbstractPagedOutputView {
return -1;
}
+ public void closeAndDelete() throws IOException {
+ try {
+ close();
+ } finally {
+ writer.deleteChannel();
+ }
+ }
+
@Override
protected MemorySegment nextSegment(MemorySegment current, int
positionInCurrent)
throws IOException {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/disk/ExternalBuffer.java
b/paimon-core/src/main/java/org/apache/paimon/disk/ExternalBuffer.java
index 9261a4a23..529d63d0c 100644
--- a/paimon-core/src/main/java/org/apache/paimon/disk/ExternalBuffer.java
+++ b/paimon-core/src/main/java/org/apache/paimon/disk/ExternalBuffer.java
@@ -19,11 +19,11 @@
package org.apache.paimon.disk;
import org.apache.paimon.annotation.VisibleForTesting;
+import org.apache.paimon.compression.BlockCompressionFactory;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.data.serializer.AbstractRowDataSerializer;
import org.apache.paimon.data.serializer.BinaryRowSerializer;
-import org.apache.paimon.memory.Buffer;
import org.apache.paimon.memory.MemorySegment;
import org.apache.paimon.memory.MemorySegmentPool;
import org.apache.paimon.options.MemorySize;
@@ -49,6 +49,7 @@ public class ExternalBuffer implements RowBuffer {
private final BinaryRowSerializer binaryRowSerializer;
private final InMemoryBuffer inMemoryBuffer;
private final MemorySize maxDiskSize;
+ private final BlockCompressionFactory compactionFactory;
// The size of each segment
private final int segmentSize;
@@ -62,11 +63,14 @@ public class ExternalBuffer implements RowBuffer {
IOManager ioManager,
MemorySegmentPool pool,
AbstractRowDataSerializer<?> serializer,
- MemorySize maxDiskSize) {
+ MemorySize maxDiskSize,
+ String compression) {
this.ioManager = ioManager;
this.pool = pool;
this.maxDiskSize = maxDiskSize;
+ this.compactionFactory = BlockCompressionFactory.create(compression);
+
this.binaryRowSerializer =
serializer instanceof BinaryRowSerializer
? (BinaryRowSerializer) serializer.duplicate()
@@ -155,10 +159,11 @@ public class ExternalBuffer implements RowBuffer {
private void spill() throws IOException {
FileIOChannel.ID channel = ioManager.createChannel();
- BufferFileWriter writer = ioManager.createBufferFileWriter(channel);
+ ChannelWriterOutputView channelWriterOutputView =
+ new ChannelWriterOutputView(
+ ioManager.createBufferFileWriter(channel),
compactionFactory, segmentSize);
int numRecordBuffers = inMemoryBuffer.getNumRecordBuffers();
ArrayList<MemorySegment> segments =
inMemoryBuffer.getRecordBufferSegments();
- long writeBytes;
try {
// spill in memory buffer in zero-copy.
for (int i = 0; i < numRecordBuffers; i++) {
@@ -167,16 +172,15 @@ public class ExternalBuffer implements RowBuffer {
i == numRecordBuffers - 1
? inMemoryBuffer.getNumBytesInLastBuffer()
: segment.size();
- writer.writeBlock(Buffer.create(segment, bufferSize));
+ channelWriterOutputView.write(segment, 0, bufferSize);
}
- writeBytes = writer.getSize();
LOG.info(
"here spill the reset buffer data with {} records {}
bytes",
inMemoryBuffer.size(),
- writer.getSize());
- writer.close();
+ channelWriterOutputView.getNumBytes());
+ channelWriterOutputView.close();
} catch (IOException e) {
- writer.closeAndDelete();
+ channelWriterOutputView.closeAndDelete();
throw e;
}
@@ -185,7 +189,7 @@ public class ExternalBuffer implements RowBuffer {
channel,
inMemoryBuffer.getNumRecordBuffers(),
inMemoryBuffer.getNumBytesInLastBuffer(),
- writeBytes));
+ channelWriterOutputView.getNumBytes()));
inMemoryBuffer.reset();
}
@@ -224,7 +228,7 @@ public class ExternalBuffer implements RowBuffer {
private int currentChannelID = -1;
private BinaryRow row;
private boolean closed;
- private BufferFileReaderInputView channelReader;
+ private ChannelReaderInputView channelReader;
private BufferIterator() {
this.closed = false;
@@ -303,7 +307,13 @@ public class ExternalBuffer implements RowBuffer {
// new reader.
this.channelReader =
- new BufferFileReaderInputView(channel.getChannel(),
ioManager, segmentSize);
+ new ChannelReaderInputView(
+ channel.getChannel(),
+ ioManager,
+ compactionFactory,
+ segmentSize,
+ channel.getBlockCount());
+
this.currentIterator =
channelReader.createBinaryRowIterator(binaryRowSerializer);
}
diff --git a/paimon-core/src/main/java/org/apache/paimon/disk/RowBuffer.java
b/paimon-core/src/main/java/org/apache/paimon/disk/RowBuffer.java
index b4d3aa89f..3c0a31cd6 100644
--- a/paimon-core/src/main/java/org/apache/paimon/disk/RowBuffer.java
+++ b/paimon-core/src/main/java/org/apache/paimon/disk/RowBuffer.java
@@ -59,9 +59,10 @@ public interface RowBuffer {
MemorySegmentPool memoryPool,
AbstractRowDataSerializer<InternalRow> serializer,
boolean spillable,
- MemorySize maxDiskSize) {
+ MemorySize maxDiskSize,
+ String compression) {
if (spillable) {
- return new ExternalBuffer(ioManager, memoryPool, serializer,
maxDiskSize);
+ return new ExternalBuffer(ioManager, memoryPool, serializer,
maxDiskSize, compression);
} else {
return new InMemoryBuffer(memoryPool, serializer);
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java
b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java
index dbf73caef..f794b160c 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java
@@ -68,6 +68,7 @@ public class AppendOnlyFileStoreWrite extends
MemoryFileStoreWrite<InternalRow>
private final int compactionMaxFileNum;
private final boolean commitForceCompact;
private final String fileCompression;
+ private final String spillCompression;
private final boolean useWriteBuffer;
private final boolean spillable;
private final MemorySize maxDiskSize;
@@ -101,6 +102,7 @@ public class AppendOnlyFileStoreWrite extends
MemoryFileStoreWrite<InternalRow>
this.commitForceCompact = options.commitForceCompact();
this.skipCompaction = options.writeOnly();
this.fileCompression = options.fileCompression();
+ this.spillCompression = options.spillCompression();
this.useWriteBuffer = options.useWriteBufferForAppend();
this.spillable = options.writeBufferSpillable(fileIO.isObjectStore(),
isStreamingMode);
this.maxDiskSize = options.writeBufferSpillDiskSize();
@@ -149,6 +151,7 @@ public class AppendOnlyFileStoreWrite extends
MemoryFileStoreWrite<InternalRow>
useWriteBuffer || forceBufferSpill,
spillable || forceBufferSpill,
fileCompression,
+ spillCompression,
statsCollectors,
maxDiskSize);
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java
b/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java
index 2c398d141..87bb14745 100644
---
a/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java
@@ -600,6 +600,7 @@ public class AppendOnlyWriterTest {
useWriteBuffer,
spillable,
CoreOptions.FILE_COMPRESSION.defaultValue(),
+ CoreOptions.SPILL_COMPRESSION.defaultValue(),
StatsCollectorFactories.createStatsFactories(
options,
AppendOnlyWriterTest.SCHEMA.getFieldNames()),
MemorySize.MAX_VALUE);
diff --git
a/paimon-core/src/test/java/org/apache/paimon/disk/ExternalBufferTest.java
b/paimon-core/src/test/java/org/apache/paimon/disk/ExternalBufferTest.java
index 818951943..3ea27cfa7 100644
--- a/paimon-core/src/test/java/org/apache/paimon/disk/ExternalBufferTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/disk/ExternalBufferTest.java
@@ -18,6 +18,7 @@
package org.apache.paimon.disk;
+import org.apache.paimon.CoreOptions;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.BinaryRowWriter;
import org.apache.paimon.data.BinaryString;
@@ -65,7 +66,8 @@ public class ExternalBufferTest {
ioManager,
new HeapMemorySegmentPool(2 * DEFAULT_PAGE_SIZE,
DEFAULT_PAGE_SIZE),
this.serializer,
- maxDiskSize);
+ maxDiskSize,
+ CoreOptions.SPILL_COMPRESSION.defaultValue());
}
@Test
@@ -179,7 +181,8 @@ public class ExternalBufferTest {
ioManager,
new HeapMemorySegmentPool(3 * DEFAULT_PAGE_SIZE,
DEFAULT_PAGE_SIZE),
new BinaryRowSerializer(1),
- MemorySize.MAX_VALUE);
+ MemorySize.MAX_VALUE,
+ CoreOptions.SPILL_COMPRESSION.defaultValue());
assertThatThrownBy(() ->
writeHuge(buffer)).isInstanceOf(IOException.class);
buffer.reset();
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/format/FileFormatSuffixTest.java
b/paimon-core/src/test/java/org/apache/paimon/format/FileFormatSuffixTest.java
index 608345696..22826d5bc 100644
---
a/paimon-core/src/test/java/org/apache/paimon/format/FileFormatSuffixTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/format/FileFormatSuffixTest.java
@@ -85,6 +85,7 @@ public class FileFormatSuffixTest extends
KeyValueFileReadWriteTest {
false,
false,
CoreOptions.FILE_COMPRESSION.defaultValue(),
+ CoreOptions.SPILL_COMPRESSION.defaultValue(),
StatsCollectorFactories.createStatsFactories(
options, SCHEMA.getFieldNames()),
MemorySize.MAX_VALUE);