This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 6757f5648 [core] Add compression for ExternalBuffer. (#3146)
6757f5648 is described below

commit 6757f56482fd6ba9503c439b2a0e44e30af5e3cd
Author: YeJunHao <[email protected]>
AuthorDate: Wed Apr 3 09:48:17 2024 +0800

    [core] Add compression for ExternalBuffer. (#3146)
---
 .../apache/paimon/memory/MemorySegmentPool.java    |  2 +-
 .../org/apache/paimon/append/AppendOnlyWriter.java | 15 +++-
 .../paimon/crosspartition/GlobalIndexAssigner.java |  3 +-
 .../paimon/disk/BufferFileReaderInputView.java     | 99 ----------------------
 .../apache/paimon/disk/ChannelReaderInputView.java | 33 ++++++++
 .../paimon/disk/ChannelWriterOutputView.java       |  8 ++
 .../org/apache/paimon/disk/ExternalBuffer.java     | 34 +++++---
 .../java/org/apache/paimon/disk/RowBuffer.java     |  5 +-
 .../paimon/operation/AppendOnlyFileStoreWrite.java |  3 +
 .../apache/paimon/append/AppendOnlyWriterTest.java |  1 +
 .../org/apache/paimon/disk/ExternalBufferTest.java |  7 +-
 .../apache/paimon/format/FileFormatSuffixTest.java |  1 +
 12 files changed, 90 insertions(+), 121 deletions(-)

diff --git 
a/paimon-common/src/main/java/org/apache/paimon/memory/MemorySegmentPool.java 
b/paimon-common/src/main/java/org/apache/paimon/memory/MemorySegmentPool.java
index 9ea5fa3f6..18538b2fa 100644
--- 
a/paimon-common/src/main/java/org/apache/paimon/memory/MemorySegmentPool.java
+++ 
b/paimon-common/src/main/java/org/apache/paimon/memory/MemorySegmentPool.java
@@ -36,7 +36,7 @@ public interface MemorySegmentPool extends 
MemorySegmentSource {
     /**
      * Get the page size of each page this pool holds.
      *
-     * @return the page size
+     * @return the page size, the bytes size in one page.
      */
     int pageSize();
 
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java 
b/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java
index 9566dd372..4b00eae1f 100644
--- a/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java
+++ b/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java
@@ -71,6 +71,7 @@ public class AppendOnlyWriter implements 
RecordWriter<InternalRow>, MemoryOwner
     private final List<DataFileMeta> compactAfter;
     private final LongCounter seqNumCounter;
     private final String fileCompression;
+    private final String spillCompression;
     private SinkWriter sinkWriter;
     private final FieldStatsCollector.Factory[] statsCollectors;
     private final IOManager ioManager;
@@ -93,6 +94,7 @@ public class AppendOnlyWriter implements 
RecordWriter<InternalRow>, MemoryOwner
             boolean useWriteBuffer,
             boolean spillable,
             String fileCompression,
+            String spillCompression,
             FieldStatsCollector.Factory[] statsCollectors,
             MemorySize maxDiskSize) {
         this.fileIO = fileIO;
@@ -109,13 +111,14 @@ public class AppendOnlyWriter implements 
RecordWriter<InternalRow>, MemoryOwner
         this.compactAfter = new ArrayList<>();
         this.seqNumCounter = new LongCounter(maxSequenceNumber + 1);
         this.fileCompression = fileCompression;
+        this.spillCompression = spillCompression;
         this.ioManager = ioManager;
         this.statsCollectors = statsCollectors;
         this.maxDiskSize = maxDiskSize;
 
         this.sinkWriter =
                 useWriteBuffer
-                        ? new BufferedSinkWriter(spillable, maxDiskSize)
+                        ? new BufferedSinkWriter(spillable, maxDiskSize, 
spillCompression)
                         : new DirectSinkWriter();
 
         if (increment != null) {
@@ -211,7 +214,7 @@ public class AppendOnlyWriter implements 
RecordWriter<InternalRow>, MemoryOwner
             trySyncLatestCompaction(true);
 
             sinkWriter.close();
-            sinkWriter = new BufferedSinkWriter(true, maxDiskSize);
+            sinkWriter = new BufferedSinkWriter(true, maxDiskSize, 
spillCompression);
             sinkWriter.setMemoryPool(memorySegmentPool);
         }
     }
@@ -378,11 +381,14 @@ public class AppendOnlyWriter implements 
RecordWriter<InternalRow>, MemoryOwner
 
         private final MemorySize maxDiskSize;
 
+        private final String compression;
+
         private RowBuffer writeBuffer;
 
-        private BufferedSinkWriter(boolean spillable, MemorySize maxDiskSize) {
+        private BufferedSinkWriter(boolean spillable, MemorySize maxDiskSize, 
String compression) {
             this.spillable = spillable;
             this.maxDiskSize = maxDiskSize;
+            this.compression = compression;
         }
 
         @Override
@@ -439,7 +445,8 @@ public class AppendOnlyWriter implements 
RecordWriter<InternalRow>, MemoryOwner
                             memoryPool,
                             new InternalRowSerializer(writeSchema),
                             spillable,
-                            maxDiskSize);
+                            maxDiskSize,
+                            compression);
         }
 
         @Override
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/crosspartition/GlobalIndexAssigner.java
 
b/paimon-core/src/main/java/org/apache/paimon/crosspartition/GlobalIndexAssigner.java
index c53d055d5..819937984 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/crosspartition/GlobalIndexAssigner.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/crosspartition/GlobalIndexAssigner.java
@@ -171,7 +171,8 @@ public class GlobalIndexAssigner implements Serializable, 
Closeable {
                                 coreOptions.writeBufferSize() / 2, 
coreOptions.pageSize()),
                         new InternalRowSerializer(table.rowType()),
                         true,
-                        coreOptions.writeBufferSpillDiskSize());
+                        coreOptions.writeBufferSpillDiskSize(),
+                        coreOptions.spillCompression());
     }
 
     public void bootstrapKey(InternalRow value) throws IOException {
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/disk/BufferFileReaderInputView.java
 
b/paimon-core/src/main/java/org/apache/paimon/disk/BufferFileReaderInputView.java
deleted file mode 100644
index ab91b8944..000000000
--- 
a/paimon-core/src/main/java/org/apache/paimon/disk/BufferFileReaderInputView.java
+++ /dev/null
@@ -1,99 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.disk;
-
-import org.apache.paimon.data.AbstractPagedInputView;
-import org.apache.paimon.data.BinaryRow;
-import org.apache.paimon.data.serializer.BinaryRowSerializer;
-import org.apache.paimon.memory.Buffer;
-import org.apache.paimon.memory.MemorySegment;
-import org.apache.paimon.utils.MutableObjectIterator;
-
-import java.io.EOFException;
-import java.io.IOException;
-
-/** An {@link AbstractPagedInputView} which reads blocks from channel without 
compression. */
-public class BufferFileReaderInputView extends AbstractPagedInputView {
-
-    private final BufferFileReader reader;
-    private final MemorySegment segment;
-
-    private int currentSegmentLimit;
-
-    public BufferFileReaderInputView(FileIOChannel.ID id, IOManager ioManager, 
int segmentSize)
-            throws IOException {
-        this.reader = ioManager.createBufferFileReader(id);
-        this.segment = MemorySegment.wrap(new byte[segmentSize]);
-    }
-
-    @Override
-    protected MemorySegment nextSegment(MemorySegment current) throws 
IOException {
-        if (reader.hasReachedEndOfFile()) {
-            throw new EOFException();
-        }
-
-        Buffer buffer = Buffer.create(segment);
-        reader.readInto(buffer);
-        this.currentSegmentLimit = buffer.getSize();
-        return segment;
-    }
-
-    @Override
-    protected int getLimitForSegment(MemorySegment segment) {
-        return currentSegmentLimit;
-    }
-
-    public void close() throws IOException {
-        reader.close();
-    }
-
-    public FileIOChannel getChannel() {
-        return reader;
-    }
-
-    public MutableObjectIterator<BinaryRow> createBinaryRowIterator(
-            BinaryRowSerializer serializer) {
-        return new BinaryRowChannelInputViewIterator(serializer);
-    }
-
-    private class BinaryRowChannelInputViewIterator implements 
MutableObjectIterator<BinaryRow> {
-
-        protected final BinaryRowSerializer serializer;
-
-        public BinaryRowChannelInputViewIterator(BinaryRowSerializer 
serializer) {
-            this.serializer = serializer;
-        }
-
-        @Override
-        public BinaryRow next(BinaryRow reuse) throws IOException {
-            try {
-                return this.serializer.deserializeFromPages(reuse, 
BufferFileReaderInputView.this);
-            } catch (EOFException e) {
-                close();
-                return null;
-            }
-        }
-
-        @Override
-        public BinaryRow next() throws IOException {
-            throw new UnsupportedOperationException(
-                    "This method is disabled due to performance issue!");
-        }
-    }
-}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/disk/ChannelReaderInputView.java 
b/paimon-core/src/main/java/org/apache/paimon/disk/ChannelReaderInputView.java
index e0ecaa68c..aeb6e08c2 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/disk/ChannelReaderInputView.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/disk/ChannelReaderInputView.java
@@ -21,9 +21,12 @@ package org.apache.paimon.disk;
 import org.apache.paimon.compression.BlockCompressionFactory;
 import org.apache.paimon.compression.BlockDecompressor;
 import org.apache.paimon.data.AbstractPagedInputView;
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.serializer.BinaryRowSerializer;
 import org.apache.paimon.io.DataInputView;
 import org.apache.paimon.memory.Buffer;
 import org.apache.paimon.memory.MemorySegment;
+import org.apache.paimon.utils.MutableObjectIterator;
 
 import java.io.EOFException;
 import java.io.IOException;
@@ -100,4 +103,34 @@ public class ChannelReaderInputView extends 
AbstractPagedInputView {
     public FileIOChannel getChannel() {
         return reader;
     }
+
+    public MutableObjectIterator<BinaryRow> createBinaryRowIterator(
+            BinaryRowSerializer serializer) {
+        return new BinaryRowChannelInputViewIterator(serializer);
+    }
+
+    private class BinaryRowChannelInputViewIterator implements 
MutableObjectIterator<BinaryRow> {
+
+        protected final BinaryRowSerializer serializer;
+
+        public BinaryRowChannelInputViewIterator(BinaryRowSerializer 
serializer) {
+            this.serializer = serializer;
+        }
+
+        @Override
+        public BinaryRow next(BinaryRow reuse) throws IOException {
+            try {
+                return this.serializer.deserializeFromPages(reuse, 
ChannelReaderInputView.this);
+            } catch (EOFException e) {
+                close();
+                return null;
+            }
+        }
+
+        @Override
+        public BinaryRow next() throws IOException {
+            throw new UnsupportedOperationException(
+                    "This method is disabled due to performance issue!");
+        }
+    }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/disk/ChannelWriterOutputView.java 
b/paimon-core/src/main/java/org/apache/paimon/disk/ChannelWriterOutputView.java
index 98a532cc5..6a1838e9b 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/disk/ChannelWriterOutputView.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/disk/ChannelWriterOutputView.java
@@ -71,6 +71,14 @@ public final class ChannelWriterOutputView extends 
AbstractPagedOutputView {
         return -1;
     }
 
+    public void closeAndDelete() throws IOException {
+        try {
+            close();
+        } finally {
+            writer.deleteChannel();
+        }
+    }
+
     @Override
     protected MemorySegment nextSegment(MemorySegment current, int 
positionInCurrent)
             throws IOException {
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/disk/ExternalBuffer.java 
b/paimon-core/src/main/java/org/apache/paimon/disk/ExternalBuffer.java
index 9261a4a23..529d63d0c 100644
--- a/paimon-core/src/main/java/org/apache/paimon/disk/ExternalBuffer.java
+++ b/paimon-core/src/main/java/org/apache/paimon/disk/ExternalBuffer.java
@@ -19,11 +19,11 @@
 package org.apache.paimon.disk;
 
 import org.apache.paimon.annotation.VisibleForTesting;
+import org.apache.paimon.compression.BlockCompressionFactory;
 import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.data.serializer.AbstractRowDataSerializer;
 import org.apache.paimon.data.serializer.BinaryRowSerializer;
-import org.apache.paimon.memory.Buffer;
 import org.apache.paimon.memory.MemorySegment;
 import org.apache.paimon.memory.MemorySegmentPool;
 import org.apache.paimon.options.MemorySize;
@@ -49,6 +49,7 @@ public class ExternalBuffer implements RowBuffer {
     private final BinaryRowSerializer binaryRowSerializer;
     private final InMemoryBuffer inMemoryBuffer;
     private final MemorySize maxDiskSize;
+    private final BlockCompressionFactory compactionFactory;
 
     // The size of each segment
     private final int segmentSize;
@@ -62,11 +63,14 @@ public class ExternalBuffer implements RowBuffer {
             IOManager ioManager,
             MemorySegmentPool pool,
             AbstractRowDataSerializer<?> serializer,
-            MemorySize maxDiskSize) {
+            MemorySize maxDiskSize,
+            String compression) {
         this.ioManager = ioManager;
         this.pool = pool;
         this.maxDiskSize = maxDiskSize;
 
+        this.compactionFactory = BlockCompressionFactory.create(compression);
+
         this.binaryRowSerializer =
                 serializer instanceof BinaryRowSerializer
                         ? (BinaryRowSerializer) serializer.duplicate()
@@ -155,10 +159,11 @@ public class ExternalBuffer implements RowBuffer {
     private void spill() throws IOException {
         FileIOChannel.ID channel = ioManager.createChannel();
 
-        BufferFileWriter writer = ioManager.createBufferFileWriter(channel);
+        ChannelWriterOutputView channelWriterOutputView =
+                new ChannelWriterOutputView(
+                        ioManager.createBufferFileWriter(channel), 
compactionFactory, segmentSize);
         int numRecordBuffers = inMemoryBuffer.getNumRecordBuffers();
         ArrayList<MemorySegment> segments = 
inMemoryBuffer.getRecordBufferSegments();
-        long writeBytes;
         try {
             // spill in memory buffer in zero-copy.
             for (int i = 0; i < numRecordBuffers; i++) {
@@ -167,16 +172,15 @@ public class ExternalBuffer implements RowBuffer {
                         i == numRecordBuffers - 1
                                 ? inMemoryBuffer.getNumBytesInLastBuffer()
                                 : segment.size();
-                writer.writeBlock(Buffer.create(segment, bufferSize));
+                channelWriterOutputView.write(segment, 0, bufferSize);
             }
-            writeBytes = writer.getSize();
             LOG.info(
                     "here spill the reset buffer data with {} records {} 
bytes",
                     inMemoryBuffer.size(),
-                    writer.getSize());
-            writer.close();
+                    channelWriterOutputView.getNumBytes());
+            channelWriterOutputView.close();
         } catch (IOException e) {
-            writer.closeAndDelete();
+            channelWriterOutputView.closeAndDelete();
             throw e;
         }
 
@@ -185,7 +189,7 @@ public class ExternalBuffer implements RowBuffer {
                         channel,
                         inMemoryBuffer.getNumRecordBuffers(),
                         inMemoryBuffer.getNumBytesInLastBuffer(),
-                        writeBytes));
+                        channelWriterOutputView.getNumBytes()));
 
         inMemoryBuffer.reset();
     }
@@ -224,7 +228,7 @@ public class ExternalBuffer implements RowBuffer {
         private int currentChannelID = -1;
         private BinaryRow row;
         private boolean closed;
-        private BufferFileReaderInputView channelReader;
+        private ChannelReaderInputView channelReader;
 
         private BufferIterator() {
             this.closed = false;
@@ -303,7 +307,13 @@ public class ExternalBuffer implements RowBuffer {
 
             // new reader.
             this.channelReader =
-                    new BufferFileReaderInputView(channel.getChannel(), 
ioManager, segmentSize);
+                    new ChannelReaderInputView(
+                            channel.getChannel(),
+                            ioManager,
+                            compactionFactory,
+                            segmentSize,
+                            channel.getBlockCount());
+
             this.currentIterator = 
channelReader.createBinaryRowIterator(binaryRowSerializer);
         }
 
diff --git a/paimon-core/src/main/java/org/apache/paimon/disk/RowBuffer.java 
b/paimon-core/src/main/java/org/apache/paimon/disk/RowBuffer.java
index b4d3aa89f..3c0a31cd6 100644
--- a/paimon-core/src/main/java/org/apache/paimon/disk/RowBuffer.java
+++ b/paimon-core/src/main/java/org/apache/paimon/disk/RowBuffer.java
@@ -59,9 +59,10 @@ public interface RowBuffer {
             MemorySegmentPool memoryPool,
             AbstractRowDataSerializer<InternalRow> serializer,
             boolean spillable,
-            MemorySize maxDiskSize) {
+            MemorySize maxDiskSize,
+            String compression) {
         if (spillable) {
-            return new ExternalBuffer(ioManager, memoryPool, serializer, 
maxDiskSize);
+            return new ExternalBuffer(ioManager, memoryPool, serializer, 
maxDiskSize, compression);
         } else {
             return new InMemoryBuffer(memoryPool, serializer);
         }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java
index dbf73caef..f794b160c 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java
@@ -68,6 +68,7 @@ public class AppendOnlyFileStoreWrite extends 
MemoryFileStoreWrite<InternalRow>
     private final int compactionMaxFileNum;
     private final boolean commitForceCompact;
     private final String fileCompression;
+    private final String spillCompression;
     private final boolean useWriteBuffer;
     private final boolean spillable;
     private final MemorySize maxDiskSize;
@@ -101,6 +102,7 @@ public class AppendOnlyFileStoreWrite extends 
MemoryFileStoreWrite<InternalRow>
         this.commitForceCompact = options.commitForceCompact();
         this.skipCompaction = options.writeOnly();
         this.fileCompression = options.fileCompression();
+        this.spillCompression = options.spillCompression();
         this.useWriteBuffer = options.useWriteBufferForAppend();
         this.spillable = options.writeBufferSpillable(fileIO.isObjectStore(), 
isStreamingMode);
         this.maxDiskSize = options.writeBufferSpillDiskSize();
@@ -149,6 +151,7 @@ public class AppendOnlyFileStoreWrite extends 
MemoryFileStoreWrite<InternalRow>
                 useWriteBuffer || forceBufferSpill,
                 spillable || forceBufferSpill,
                 fileCompression,
+                spillCompression,
                 statsCollectors,
                 maxDiskSize);
     }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java 
b/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java
index 2c398d141..87bb14745 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java
@@ -600,6 +600,7 @@ public class AppendOnlyWriterTest {
                         useWriteBuffer,
                         spillable,
                         CoreOptions.FILE_COMPRESSION.defaultValue(),
+                        CoreOptions.SPILL_COMPRESSION.defaultValue(),
                         StatsCollectorFactories.createStatsFactories(
                                 options, 
AppendOnlyWriterTest.SCHEMA.getFieldNames()),
                         MemorySize.MAX_VALUE);
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/disk/ExternalBufferTest.java 
b/paimon-core/src/test/java/org/apache/paimon/disk/ExternalBufferTest.java
index 818951943..3ea27cfa7 100644
--- a/paimon-core/src/test/java/org/apache/paimon/disk/ExternalBufferTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/disk/ExternalBufferTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.paimon.disk;
 
+import org.apache.paimon.CoreOptions;
 import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.data.BinaryRowWriter;
 import org.apache.paimon.data.BinaryString;
@@ -65,7 +66,8 @@ public class ExternalBufferTest {
                 ioManager,
                 new HeapMemorySegmentPool(2 * DEFAULT_PAGE_SIZE, 
DEFAULT_PAGE_SIZE),
                 this.serializer,
-                maxDiskSize);
+                maxDiskSize,
+                CoreOptions.SPILL_COMPRESSION.defaultValue());
     }
 
     @Test
@@ -179,7 +181,8 @@ public class ExternalBufferTest {
                         ioManager,
                         new HeapMemorySegmentPool(3 * DEFAULT_PAGE_SIZE, 
DEFAULT_PAGE_SIZE),
                         new BinaryRowSerializer(1),
-                        MemorySize.MAX_VALUE);
+                        MemorySize.MAX_VALUE,
+                        CoreOptions.SPILL_COMPRESSION.defaultValue());
         assertThatThrownBy(() -> 
writeHuge(buffer)).isInstanceOf(IOException.class);
         buffer.reset();
     }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/format/FileFormatSuffixTest.java 
b/paimon-core/src/test/java/org/apache/paimon/format/FileFormatSuffixTest.java
index 608345696..22826d5bc 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/format/FileFormatSuffixTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/format/FileFormatSuffixTest.java
@@ -85,6 +85,7 @@ public class FileFormatSuffixTest extends 
KeyValueFileReadWriteTest {
                         false,
                         false,
                         CoreOptions.FILE_COMPRESSION.defaultValue(),
+                        CoreOptions.SPILL_COMPRESSION.defaultValue(),
                         StatsCollectorFactories.createStatsFactories(
                                 options, SCHEMA.getFieldNames()),
                         MemorySize.MAX_VALUE);

Reply via email to