This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a commit to branch release-0.5 in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
commit 52be707ed3e1bf38291b2539744a5dab8fc2fbc0 Author: Jingsong Lee <[email protected]> AuthorDate: Thu Aug 31 19:07:37 2023 +0800 [flink] Fix global dynamic bucket mode (#1914) --- .../data/serializer/InternalRowSerializer.java | 2 +- .../paimon/disk/BufferFileReaderInputView.java | 99 ++++++ .../org/apache/paimon/disk/ExternalBuffer.java | 383 +++++++++++++++++++++ .../org/apache/paimon/disk/ExternalBufferTest.java | 209 +++++++++++ .../paimon/flink/sink/cdc/CdcSinkBuilder.java | 4 - .../sink/cdc/FlinkCdcSyncDatabaseSinkBuilder.java | 3 - .../flink/sink/index/GlobalDynamicBucketSink.java | 16 +- .../sink/index/GlobalDynamicCdcBucketSink.java | 133 ------- .../flink/sink/index/GlobalIndexAssigner.java | 29 +- .../sink/index/GlobalIndexAssignerOperator.java | 107 ++++-- .../paimon/flink/sink/index/IndexBootstrap.java | 48 ++- .../sink/index/KeyPartRowChannelComputer.java | 6 +- .../flink/GlobalDynamicBucketTableITCase.java | 19 + .../sink/cdc/FlinkCdcSyncTableSinkITCase.java | 2 + .../flink/sink/index/IndexBootstrapTest.java | 17 +- 15 files changed, 875 insertions(+), 202 deletions(-) diff --git a/paimon-common/src/main/java/org/apache/paimon/data/serializer/InternalRowSerializer.java b/paimon-common/src/main/java/org/apache/paimon/data/serializer/InternalRowSerializer.java index b0701be48..0f573f10b 100644 --- a/paimon-common/src/main/java/org/apache/paimon/data/serializer/InternalRowSerializer.java +++ b/paimon-common/src/main/java/org/apache/paimon/data/serializer/InternalRowSerializer.java @@ -72,7 +72,7 @@ public class InternalRowSerializer extends AbstractRowDataSerializer<InternalRow } @Override - public Serializer<InternalRow> duplicate() { + public InternalRowSerializer duplicate() { Serializer<?>[] duplicateFieldSerializers = new Serializer[fieldSerializers.length]; for (int i = 0; i < fieldSerializers.length; i++) { duplicateFieldSerializers[i] = fieldSerializers[i].duplicate(); diff --git a/paimon-core/src/main/java/org/apache/paimon/disk/BufferFileReaderInputView.java b/paimon-core/src/main/java/org/apache/paimon/disk/BufferFileReaderInputView.java new file mode 100644 index 000000000..ab91b8944 --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/disk/BufferFileReaderInputView.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.disk; + +import org.apache.paimon.data.AbstractPagedInputView; +import org.apache.paimon.data.BinaryRow; +import org.apache.paimon.data.serializer.BinaryRowSerializer; +import org.apache.paimon.memory.Buffer; +import org.apache.paimon.memory.MemorySegment; +import org.apache.paimon.utils.MutableObjectIterator; + +import java.io.EOFException; +import java.io.IOException; + +/** An {@link AbstractPagedInputView} which reads blocks from channel without compression. */ +public class BufferFileReaderInputView extends AbstractPagedInputView { + + private final BufferFileReader reader; + private final MemorySegment segment; + + private int currentSegmentLimit; + + public BufferFileReaderInputView(FileIOChannel.ID id, IOManager ioManager, int segmentSize) + throws IOException { + this.reader = ioManager.createBufferFileReader(id); + this.segment = MemorySegment.wrap(new byte[segmentSize]); + } + + @Override + protected MemorySegment nextSegment(MemorySegment current) throws IOException { + if (reader.hasReachedEndOfFile()) { + throw new EOFException(); + } + + Buffer buffer = Buffer.create(segment); + reader.readInto(buffer); + this.currentSegmentLimit = buffer.getSize(); + return segment; + } + + @Override + protected int getLimitForSegment(MemorySegment segment) { + return currentSegmentLimit; + } + + public void close() throws IOException { + reader.close(); + } + + public FileIOChannel getChannel() { + return reader; + } + + public MutableObjectIterator<BinaryRow> createBinaryRowIterator( + BinaryRowSerializer serializer) { + return new BinaryRowChannelInputViewIterator(serializer); + } + + private class BinaryRowChannelInputViewIterator implements MutableObjectIterator<BinaryRow> { + + protected final BinaryRowSerializer serializer; + + public BinaryRowChannelInputViewIterator(BinaryRowSerializer serializer) { + this.serializer = serializer; + } + + @Override + public BinaryRow next(BinaryRow reuse) throws IOException { + try { + return this.serializer.deserializeFromPages(reuse, BufferFileReaderInputView.this); + } catch (EOFException e) { + close(); + return null; + } + } + + @Override + public BinaryRow next() throws IOException { + throw new UnsupportedOperationException( + "This method is disabled due to performance issue!"); + } + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/disk/ExternalBuffer.java b/paimon-core/src/main/java/org/apache/paimon/disk/ExternalBuffer.java new file mode 100644 index 000000000..9ced8e83d --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/disk/ExternalBuffer.java @@ -0,0 +1,383 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.disk; + +import org.apache.paimon.annotation.VisibleForTesting; +import org.apache.paimon.data.BinaryRow; +import org.apache.paimon.data.InternalRow; +import org.apache.paimon.data.RandomAccessInputView; +import org.apache.paimon.data.SimpleCollectingOutputView; +import org.apache.paimon.data.serializer.AbstractRowDataSerializer; +import org.apache.paimon.data.serializer.BinaryRowSerializer; +import org.apache.paimon.memory.Buffer; +import org.apache.paimon.memory.MemorySegment; +import org.apache.paimon.memory.MemorySegmentPool; +import org.apache.paimon.utils.MutableObjectIterator; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Closeable; +import java.io.EOFException; +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import static org.apache.paimon.utils.Preconditions.checkState; + +/** An external buffer for storing rows, it will spill the data to disk when the memory is full. */ +public class ExternalBuffer { + + private static final Logger LOG = LoggerFactory.getLogger(ExternalBuffer.class); + + private final IOManager ioManager; + private final MemorySegmentPool pool; + private final BinaryRowSerializer binaryRowSerializer; + private final InMemoryBuffer inMemoryBuffer; + + // The size of each segment + private final int segmentSize; + + private final List<ChannelWithMeta> spilledChannelIDs; + private int numRows; + + private boolean addCompleted; + + public ExternalBuffer( + IOManager ioManager, MemorySegmentPool pool, AbstractRowDataSerializer<?> serializer) { + this.ioManager = ioManager; + this.pool = pool; + + this.binaryRowSerializer = + serializer instanceof BinaryRowSerializer + ? (BinaryRowSerializer) serializer.duplicate() + : new BinaryRowSerializer(serializer.getArity()); + + this.segmentSize = pool.pageSize(); + + this.spilledChannelIDs = new ArrayList<>(); + + this.numRows = 0; + + this.addCompleted = false; + + //noinspection unchecked + this.inMemoryBuffer = + new InMemoryBuffer((AbstractRowDataSerializer<InternalRow>) serializer); + } + + public void reset() { + clearChannels(); + inMemoryBuffer.reset(); + numRows = 0; + addCompleted = false; + } + + public void add(InternalRow row) throws IOException { + checkState(!addCompleted, "This buffer has add completed."); + if (!inMemoryBuffer.write(row)) { + // Check if record is too big. + if (inMemoryBuffer.getCurrentDataBufferOffset() == 0) { + throwTooBigException(row); + } + spill(); + if (!inMemoryBuffer.write(row)) { + throwTooBigException(row); + } + } + + numRows++; + } + + public void complete() { + addCompleted = true; + } + + public BufferIterator newIterator() { + checkState(addCompleted, "This buffer has not add completed."); + return new BufferIterator(); + } + + private void throwTooBigException(InternalRow row) throws IOException { + int rowSize = inMemoryBuffer.serializer.toBinaryRow(row).toBytes().length; + throw new IOException( + "Record is too big, it can't be added to a empty InMemoryBuffer! " + + "Record size: " + + rowSize + + ", Buffer: " + + memorySize()); + } + + private void spill() throws IOException { + FileIOChannel.ID channel = ioManager.createChannel(); + + BufferFileWriter writer = ioManager.createBufferFileWriter(channel); + int numRecordBuffers = inMemoryBuffer.getNumRecordBuffers(); + ArrayList<MemorySegment> segments = inMemoryBuffer.getRecordBufferSegments(); + try { + // spill in memory buffer in zero-copy. + for (int i = 0; i < numRecordBuffers; i++) { + MemorySegment segment = segments.get(i); + int bufferSize = + i == numRecordBuffers - 1 + ? inMemoryBuffer.getNumBytesInLastBuffer() + : segment.size(); + writer.writeBlock(Buffer.create(segment, bufferSize)); + } + LOG.info( + "here spill the reset buffer data with {} records {} bytes", + inMemoryBuffer.numRecords, + writer.getSize()); + writer.close(); + } catch (IOException e) { + writer.closeAndDelete(); + throw e; + } + + spilledChannelIDs.add( + new ChannelWithMeta( + channel, + inMemoryBuffer.getNumRecordBuffers(), + inMemoryBuffer.getNumBytesInLastBuffer())); + + inMemoryBuffer.reset(); + } + + public int size() { + return numRows; + } + + private int memorySize() { + return pool.freePages() * segmentSize; + } + + @SuppressWarnings("ResultOfMethodCallIgnored") + private void clearChannels() { + for (ChannelWithMeta meta : spilledChannelIDs) { + final File f = new File(meta.getChannel().getPath()); + if (f.exists()) { + f.delete(); + } + } + spilledChannelIDs.clear(); + } + + /** Iterator of external buffer. */ + public class BufferIterator implements Closeable { + + private MutableObjectIterator<BinaryRow> currentIterator; + private final BinaryRow reuse = binaryRowSerializer.createInstance(); + + private int currentChannelID = -1; + private BinaryRow row; + private boolean closed; + private BufferFileReaderInputView channelReader; + + private BufferIterator() { + this.closed = false; + } + + private void checkValidity() { + if (closed) { + throw new RuntimeException("This iterator is closed!"); + } + } + + @Override + public void close() { + if (closed) { + return; + } + + try { + closeCurrentFileReader(); + } catch (IOException e) { + throw new RuntimeException(e); + } + + closed = true; + } + + public boolean advanceNext() { + checkValidity(); + + try { + // get from curr iterator or new iterator. + while (true) { + if (currentIterator != null && (row = currentIterator.next(reuse)) != null) { + return true; + } else { + if (!nextIterator()) { + return false; + } + } + } + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + private boolean nextIterator() throws IOException { + if (currentChannelID == Integer.MAX_VALUE) { + return false; + } else if (currentChannelID < spilledChannelIDs.size() - 1) { + nextSpilledIterator(); + } else { + newMemoryIterator(); + } + return true; + } + + public BinaryRow getRow() { + return row; + } + + private void closeCurrentFileReader() throws IOException { + if (channelReader != null) { + channelReader.close(); + channelReader = null; + } + } + + private void nextSpilledIterator() throws IOException { + ChannelWithMeta channel = spilledChannelIDs.get(currentChannelID + 1); + currentChannelID++; + + // close current reader first. + closeCurrentFileReader(); + + // new reader. + this.channelReader = + new BufferFileReaderInputView(channel.getChannel(), ioManager, segmentSize); + this.currentIterator = channelReader.createBinaryRowIterator(binaryRowSerializer); + } + + private void newMemoryIterator() { + this.currentChannelID = Integer.MAX_VALUE; + this.currentIterator = inMemoryBuffer.newIterator(); + } + } + + @VisibleForTesting + List<ChannelWithMeta> getSpillChannels() { + return spilledChannelIDs; + } + + private class InMemoryBuffer { + + private final AbstractRowDataSerializer<InternalRow> serializer; + private final ArrayList<MemorySegment> recordBufferSegments; + private final SimpleCollectingOutputView recordCollector; + + private long currentDataBufferOffset; + private int numBytesInLastBuffer; + private int numRecords = 0; + + private InMemoryBuffer(AbstractRowDataSerializer<InternalRow> serializer) { + // serializer has states, so we must duplicate + this.serializer = (AbstractRowDataSerializer<InternalRow>) serializer.duplicate(); + this.recordBufferSegments = new ArrayList<>(); + this.recordCollector = + new SimpleCollectingOutputView(this.recordBufferSegments, pool, segmentSize); + } + + private void reset() { + this.currentDataBufferOffset = 0; + this.numRecords = 0; + returnToSegmentPool(); + this.recordCollector.reset(); + } + + private void returnToSegmentPool() { + pool.returnAll(this.recordBufferSegments); + this.recordBufferSegments.clear(); + } + + public boolean write(InternalRow row) throws IOException { + try { + this.serializer.serializeToPages(row, this.recordCollector); + currentDataBufferOffset = this.recordCollector.getCurrentOffset(); + numBytesInLastBuffer = this.recordCollector.getCurrentPositionInSegment(); + numRecords++; + return true; + } catch (EOFException e) { + return false; + } + } + + private ArrayList<MemorySegment> getRecordBufferSegments() { + return recordBufferSegments; + } + + private long getCurrentDataBufferOffset() { + return currentDataBufferOffset; + } + + private int getNumRecordBuffers() { + int result = (int) (currentDataBufferOffset / segmentSize); + long mod = currentDataBufferOffset % segmentSize; + if (mod != 0) { + result += 1; + } + return result; + } + + private int getNumBytesInLastBuffer() { + return numBytesInLastBuffer; + } + + private InMemoryBufferIterator newIterator() { + RandomAccessInputView recordBuffer = + new RandomAccessInputView( + this.recordBufferSegments, segmentSize, numBytesInLastBuffer); + return new InMemoryBufferIterator(recordBuffer, serializer); + } + } + + private static class InMemoryBufferIterator + implements MutableObjectIterator<BinaryRow>, Closeable { + + private final RandomAccessInputView recordBuffer; + private final AbstractRowDataSerializer<InternalRow> serializer; + + private InMemoryBufferIterator( + RandomAccessInputView recordBuffer, + AbstractRowDataSerializer<InternalRow> serializer) { + this.recordBuffer = recordBuffer; + this.serializer = serializer; + } + + @Override + public BinaryRow next(BinaryRow reuse) throws IOException { + try { + return (BinaryRow) serializer.mapFromPages(reuse, recordBuffer); + } catch (EOFException e) { + return null; + } + } + + @Override + public BinaryRow next() throws IOException { + throw new RuntimeException("Not support!"); + } + + @Override + public void close() {} + } +} diff --git a/paimon-core/src/test/java/org/apache/paimon/disk/ExternalBufferTest.java b/paimon-core/src/test/java/org/apache/paimon/disk/ExternalBufferTest.java new file mode 100644 index 000000000..5bd9ad511 --- /dev/null +++ b/paimon-core/src/test/java/org/apache/paimon/disk/ExternalBufferTest.java @@ -0,0 +1,209 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.disk; + +import org.apache.paimon.data.BinaryRow; +import org.apache.paimon.data.BinaryRowWriter; +import org.apache.paimon.data.BinaryString; +import org.apache.paimon.data.serializer.BinaryRowSerializer; +import org.apache.paimon.memory.HeapMemorySegmentPool; + +import org.apache.commons.math3.random.RandomDataGenerator; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +import java.io.IOException; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.List; +import java.util.Random; + +import static org.apache.paimon.memory.MemorySegmentPool.DEFAULT_PAGE_SIZE; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +/** Test for {@link ExternalBuffer}. */ +public class ExternalBufferTest { + + @TempDir Path tempDir; + + private IOManager ioManager; + private Random random; + private BinaryRowSerializer serializer; + + @BeforeEach + public void before() { + this.ioManager = IOManager.create(tempDir.toString()); + this.random = new Random(); + this.serializer = new BinaryRowSerializer(1); + } + + private ExternalBuffer newBuffer() { + return new ExternalBuffer( + ioManager, + new HeapMemorySegmentPool(2 * DEFAULT_PAGE_SIZE, DEFAULT_PAGE_SIZE), + this.serializer); + } + + @Test + public void testLess() throws Exception { + ExternalBuffer buffer = newBuffer(); + + int number = 100; + List<Long> expected = insertMulti(buffer, number); + assertThat(number).isEqualTo(buffer.size()); + assertBuffer(expected, buffer); + assertThat(0).isEqualTo(buffer.getSpillChannels().size()); + + // repeat read + assertBuffer(expected, buffer); + buffer.newIterator(); + assertBuffer(expected, buffer); + buffer.reset(); + } + + @Test + public void testSpill() throws Exception { + ExternalBuffer buffer = newBuffer(); + + int number = 5000; // 16 * 5000 + List<Long> expected = insertMulti(buffer, number); + assertThat(number).isEqualTo(buffer.size()); + assertBuffer(expected, buffer); + assertThat(buffer.getSpillChannels().size()).isGreaterThan(0); + + // repeat read + assertBuffer(expected, buffer); + buffer.newIterator(); + assertBuffer(expected, buffer); + buffer.reset(); + } + + @Test + public void testBufferReset() throws Exception { + ExternalBuffer buffer = newBuffer(); + + // less + insertMulti(buffer, 10); + buffer.reset(); + assertThat(0).isEqualTo(buffer.size()); + + // not spill + List<Long> expected = insertMulti(buffer, 100); + assertThat(100).isEqualTo(buffer.size()); + assertBuffer(expected, buffer); + buffer.reset(); + + // spill + expected = insertMulti(buffer, 2500); + assertThat(2500).isEqualTo(buffer.size()); + assertBuffer(expected, buffer); + buffer.reset(); + } + + @Test + public void testBufferResetWithSpill() throws Exception { + int inMemoryThreshold = 20; + ExternalBuffer buffer = newBuffer(); + + // spill + List<Long> expected = insertMulti(buffer, 5000); + assertThat(5000).isEqualTo(buffer.size()); + assertBuffer(expected, buffer); + buffer.reset(); + + // spill, but not read the values + insertMulti(buffer, 5000); + buffer.newIterator(); + assertThat(5000).isEqualTo(buffer.size()); + buffer.reset(); + + // not spill + expected = insertMulti(buffer, inMemoryThreshold / 2); + assertBuffer(expected, buffer); + buffer.reset(); + assertThat(0).isEqualTo(buffer.size()); + + // less + expected = insertMulti(buffer, 100); + assertThat(100).isEqualTo(buffer.size()); + assertBuffer(expected, buffer); + buffer.reset(); + } + + @Test + public void testHugeRecord() { + ExternalBuffer buffer = + new ExternalBuffer( + ioManager, + new HeapMemorySegmentPool(3 * DEFAULT_PAGE_SIZE, DEFAULT_PAGE_SIZE), + new BinaryRowSerializer(1)); + assertThatThrownBy(() -> writeHuge(buffer)).isInstanceOf(IOException.class); + buffer.reset(); + } + + private void writeHuge(ExternalBuffer buffer) throws IOException { + BinaryRow row = new BinaryRow(1); + BinaryRowWriter writer = new BinaryRowWriter(row); + writer.reset(); + RandomDataGenerator random = new RandomDataGenerator(); + writer.writeString(0, BinaryString.fromString(random.nextHexString(500000))); + writer.complete(); + buffer.add(row); + } + + private void assertBuffer(List<Long> expected, ExternalBuffer buffer) { + ExternalBuffer.BufferIterator iterator = buffer.newIterator(); + assertBuffer(expected, iterator); + iterator.close(); + } + + private void assertBuffer(List<Long> expected, ExternalBuffer.BufferIterator iterator) { + List<Long> values = new ArrayList<>(); + while (iterator.advanceNext()) { + values.add(iterator.getRow().getLong(0)); + } + assertThat(values).isEqualTo(expected); + } + + private List<Long> insertMulti(ExternalBuffer buffer, int cnt) throws IOException { + ArrayList<Long> expected = new ArrayList<>(cnt); + insertMulti(buffer, cnt, expected); + buffer.complete(); + return expected; + } + + private void insertMulti(ExternalBuffer buffer, int cnt, List<Long> expected) + throws IOException { + for (int i = 0; i < cnt; i++) { + expected.add(randomInsert(buffer)); + } + } + + private long randomInsert(ExternalBuffer buffer) throws IOException { + long l = random.nextLong(); + BinaryRow row = new BinaryRow(1); + BinaryRowWriter writer = new BinaryRowWriter(row); + writer.reset(); + writer.writeLong(0, l); + writer.complete(); + buffer.add(row); + return l; + } +} diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcSinkBuilder.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcSinkBuilder.java index 4f1745406..1bda75695 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcSinkBuilder.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcSinkBuilder.java @@ -21,7 +21,6 @@ package org.apache.paimon.flink.sink.cdc; import org.apache.paimon.annotation.Experimental; import org.apache.paimon.catalog.Catalog; import org.apache.paimon.catalog.Identifier; -import org.apache.paimon.flink.sink.index.GlobalDynamicCdcBucketSink; import org.apache.paimon.flink.utils.SingleOutputStreamOperatorUtils; import org.apache.paimon.schema.SchemaManager; import org.apache.paimon.table.BucketMode; @@ -119,9 +118,6 @@ public class CdcSinkBuilder<T> { return buildForFixedBucket(parsed); case DYNAMIC: return new CdcDynamicBucketSink((FileStoreTable) table).build(parsed, parallelism); - case GLOBAL_DYNAMIC: - return new GlobalDynamicCdcBucketSink((FileStoreTable) table) - .build(parsed, parallelism); case UNAWARE: default: throw new UnsupportedOperationException("Unsupported bucket mode: " + bucketMode); diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkBuilder.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkBuilder.java index c5b716e45..a3dc7beeb 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkBuilder.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkBuilder.java @@ -22,7 +22,6 @@ import org.apache.paimon.catalog.Catalog; import org.apache.paimon.catalog.Identifier; import org.apache.paimon.flink.action.cdc.DatabaseSyncMode; import org.apache.paimon.flink.sink.FlinkStreamPartitioner; -import org.apache.paimon.flink.sink.index.GlobalDynamicCdcBucketSink; import org.apache.paimon.flink.utils.SingleOutputStreamOperatorUtils; import org.apache.paimon.schema.SchemaManager; import org.apache.paimon.table.BucketMode; @@ -196,8 +195,6 @@ public class FlinkCdcSyncDatabaseSinkBuilder<T> { new CdcDynamicBucketSink(table).build(parsedForTable, parallelism); break; case GLOBAL_DYNAMIC: - new GlobalDynamicCdcBucketSink(table).build(parsedForTable, parallelism); - break; case UNAWARE: default: throw new UnsupportedOperationException( diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/GlobalDynamicBucketSink.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/GlobalDynamicBucketSink.java index 5f98f3b05..26cfc9b41 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/GlobalDynamicBucketSink.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/GlobalDynamicBucketSink.java @@ -43,11 +43,10 @@ import javax.annotation.Nullable; import java.util.List; import java.util.Map; import java.util.UUID; -import java.util.stream.Collectors; -import java.util.stream.Stream; import static org.apache.paimon.flink.LogicalTypeConversion.toLogicalType; import static org.apache.paimon.flink.sink.FlinkStreamPartitioner.partition; +import static org.apache.paimon.flink.sink.index.IndexBootstrap.bootstrapType; /** Sink for global dynamic bucket table. */ public class GlobalDynamicBucketSink extends FlinkWriteSink<Tuple2<RowData, Integer>> { @@ -71,13 +70,10 @@ public class GlobalDynamicBucketSink extends FlinkWriteSink<Tuple2<RowData, Inte TableSchema schema = table.schema(); RowType rowType = schema.logicalRowType(); List<String> primaryKeys = schema.primaryKeys(); - List<String> partitionKeys = schema.partitionKeys(); RowDataSerializer rowSerializer = new RowDataSerializer(toLogicalType(rowType)); - RowType keyPartType = - schema.projectedLogicalRowType( - Stream.concat(primaryKeys.stream(), partitionKeys.stream()) - .collect(Collectors.toList())); - RowDataSerializer keyPartSerializer = new RowDataSerializer(toLogicalType(keyPartType)); + + RowType bootstrapType = bootstrapType(schema); + RowDataSerializer bootstrapSerializer = new RowDataSerializer(toLogicalType(bootstrapType)); // Topology: // input -- bootstrap -- shuffle by key hash --> bucket-assigner -- shuffle by bucket --> @@ -88,7 +84,7 @@ public class GlobalDynamicBucketSink extends FlinkWriteSink<Tuple2<RowData, Inte "INDEX_BOOTSTRAP", new InternalTypeInfo<>( new KeyWithRowSerializer<>( - keyPartSerializer, rowSerializer)), + bootstrapSerializer, rowSerializer)), new IndexBootstrapOperator<>( new IndexBootstrap(table), FlinkRowData::new)) .setParallelism(input.getParallelism()); @@ -100,7 +96,7 @@ public class GlobalDynamicBucketSink extends FlinkWriteSink<Tuple2<RowData, Inte } KeyPartRowChannelComputer channelComputer = - new KeyPartRowChannelComputer(rowType, keyPartType, primaryKeys); + new KeyPartRowChannelComputer(rowType, bootstrapType, primaryKeys); DataStream<Tuple2<KeyPartOrRow, RowData>> partitionByKeyHash = partition(bootstraped, channelComputer, assignerParallelism); diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/GlobalDynamicCdcBucketSink.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/GlobalDynamicCdcBucketSink.java deleted file mode 100644 index 06bb5a147..000000000 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/GlobalDynamicCdcBucketSink.java +++ /dev/null @@ -1,133 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.paimon.flink.sink.index; - -import org.apache.paimon.data.GenericRow; -import org.apache.paimon.flink.sink.ChannelComputer; -import org.apache.paimon.flink.sink.Committable; -import org.apache.paimon.flink.sink.FlinkWriteSink; -import org.apache.paimon.flink.sink.StoreSinkWrite; -import org.apache.paimon.flink.sink.cdc.CdcDynamicBucketWriteOperator; -import org.apache.paimon.flink.sink.cdc.CdcHashKeyChannelComputer; -import org.apache.paimon.flink.sink.cdc.CdcRecord; -import org.apache.paimon.flink.sink.cdc.CdcRecordUtils; -import org.apache.paimon.flink.sink.cdc.CdcWithBucketChannelComputer; -import org.apache.paimon.schema.TableSchema; -import org.apache.paimon.table.FileStoreTable; -import org.apache.paimon.types.RowType; -import org.apache.paimon.utils.RowDataToObjectArrayConverter; - -import org.apache.flink.api.common.typeinfo.BasicTypeInfo; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.api.java.typeutils.EnumTypeInfo; -import org.apache.flink.api.java.typeutils.TupleTypeInfo; -import org.apache.flink.streaming.api.datastream.DataStream; -import org.apache.flink.streaming.api.datastream.DataStreamSink; -import org.apache.flink.streaming.api.operators.OneInputStreamOperator; - -import javax.annotation.Nullable; - -import java.util.List; -import java.util.UUID; -import java.util.stream.Collectors; -import java.util.stream.Stream; - -import static org.apache.paimon.flink.sink.FlinkStreamPartitioner.partition; - -/** Sink for global dynamic bucket table and {@link CdcRecord} inputs. */ -public class GlobalDynamicCdcBucketSink extends FlinkWriteSink<Tuple2<CdcRecord, Integer>> { - - private static final long serialVersionUID = 1L; - - public GlobalDynamicCdcBucketSink(FileStoreTable table) { - super(table, null); - } - - @Override - protected OneInputStreamOperator<Tuple2<CdcRecord, Integer>, Committable> createWriteOperator( - StoreSinkWrite.Provider writeProvider, String commitUser) { - return new CdcDynamicBucketWriteOperator(table, writeProvider, commitUser); - } - - public DataStreamSink<?> build(DataStream<CdcRecord> input, @Nullable Integer parallelism) { - String initialCommitUser = UUID.randomUUID().toString(); - - TableSchema schema = table.schema(); - List<String> primaryKeys = schema.primaryKeys(); - List<String> partitionKeys = schema.partitionKeys(); - RowType keyPartType = - schema.projectedLogicalRowType( - Stream.concat(primaryKeys.stream(), partitionKeys.stream()) - .collect(Collectors.toList())); - List<String> keyPartNames = keyPartType.getFieldNames(); - RowDataToObjectArrayConverter keyPartConverter = - new RowDataToObjectArrayConverter(keyPartType); - - // Topology: - // input -- bootstrap -- shuffle by key hash --> bucket-assigner -- shuffle by bucket --> - // writer --> committer - - TupleTypeInfo<Tuple2<KeyPartOrRow, CdcRecord>> tuple2TupleType = - new TupleTypeInfo<>(new EnumTypeInfo<>(KeyPartOrRow.class), input.getType()); - DataStream<Tuple2<KeyPartOrRow, CdcRecord>> bootstraped = - input.transform( - "INDEX_BOOTSTRAP", - tuple2TupleType, - new IndexBootstrapOperator<>( - new IndexBootstrap(table), - row -> - CdcRecordUtils.fromGenericRow( - GenericRow.ofKind( - row.getRowKind(), - keyPartConverter.convert(row)), - keyPartNames))) - .setParallelism(input.getParallelism()); - - // 1. shuffle by key hash - Integer assignerParallelism = table.coreOptions().dynamicBucketAssignerParallelism(); - if (assignerParallelism == null) { - assignerParallelism = parallelism; - } - - ChannelComputer<Tuple2<KeyPartOrRow, CdcRecord>> channelComputer = - ChannelComputer.transform( - new CdcHashKeyChannelComputer(schema), tuple2 -> tuple2.f1); - DataStream<Tuple2<KeyPartOrRow, CdcRecord>> partitionByKeyHash = - partition(bootstraped, channelComputer, assignerParallelism); - - // 2. bucket-assigner - TupleTypeInfo<Tuple2<CdcRecord, Integer>> rowWithBucketType = - new TupleTypeInfo<>(input.getType(), BasicTypeInfo.INT_TYPE_INFO); - DataStream<Tuple2<CdcRecord, Integer>> bucketAssigned = - partitionByKeyHash - .transform( - "dynamic-bucket-assigner", - rowWithBucketType, - GlobalIndexAssignerOperator.forCdcRecord(table)) - .setParallelism(partitionByKeyHash.getParallelism()); - - // 3. shuffle by bucket - - DataStream<Tuple2<CdcRecord, Integer>> partitionByBucket = - partition(bucketAssigned, new CdcWithBucketChannelComputer(schema), parallelism); - - // 4. writer and committer - return sinkFrom(partitionByBucket, initialCommitUser); - } -} diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/GlobalIndexAssigner.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/GlobalIndexAssigner.java index 2d3be0a12..78340b246 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/GlobalIndexAssigner.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/GlobalIndexAssigner.java @@ -57,7 +57,8 @@ public class GlobalIndexAssigner<T> implements Serializable { private final AbstractFileStoreTable table; private final SerializableFunction<TableSchema, PartitionKeyExtractor<T>> extractorFunction; private final SerializableFunction<TableSchema, PartitionKeyExtractor<T>> - keyPartExtractorFunction; + bootstrapExtractorFunction; + private final SerializableFunction<T, Integer> bootstrapBucketFunction; private final SerBiFunction<T, BinaryRow, T> setPartition; private final SerBiFunction<T, RowKind, T> setRowKind; @@ -78,12 +79,14 @@ public class GlobalIndexAssigner<T> implements Serializable { public GlobalIndexAssigner( Table table, SerializableFunction<TableSchema, PartitionKeyExtractor<T>> extractorFunction, - SerializableFunction<TableSchema, PartitionKeyExtractor<T>> keyPartExtractorFunction, + SerializableFunction<TableSchema, PartitionKeyExtractor<T>> bootstrapExtractorFunction, + SerializableFunction<T, Integer> bootstrapBucketFunction, SerBiFunction<T, BinaryRow, T> setPartition, SerBiFunction<T, RowKind, T> setRowKind) { this.table = (AbstractFileStoreTable) table; this.extractorFunction = extractorFunction; - this.keyPartExtractorFunction = keyPartExtractorFunction; + this.bootstrapExtractorFunction = bootstrapExtractorFunction; + this.bootstrapBucketFunction = bootstrapBucketFunction; this.setPartition = setPartition; this.setRowKind = setRowKind; } @@ -97,7 +100,7 @@ public class GlobalIndexAssigner<T> implements Serializable { CoreOptions coreOptions = table.coreOptions(); this.targetBucketRowNumber = (int) coreOptions.dynamicBucketTargetRowNum(); this.extractor = extractorFunction.apply(table.schema()); - this.keyPartExtractor = keyPartExtractorFunction.apply(table.schema()); + this.keyPartExtractor = bootstrapExtractorFunction.apply(table.schema()); // state Options options = coreOptions.toConfiguration(); @@ -164,9 +167,12 @@ public class GlobalIndexAssigner<T> implements Serializable { public void bootstrap(T value) throws IOException { BinaryRow partition = keyPartExtractor.partition(value); - keyIndex.put( - keyPartExtractor.trimmedPrimaryKey(value), - new PositiveIntInt(partMapping.index(partition), assignBucket(partition))); + BinaryRow key = keyPartExtractor.trimmedPrimaryKey(value); + int partId = partMapping.index(partition); + int bucket = bootstrapBucketFunction.apply(value); + bucketAssigner.bootstrapBucket(partition, bucket); + PositiveIntInt partAndBucket = new PositiveIntInt(partId, bucket); + keyIndex.put(key, partAndBucket); } private void processNewRecord(BinaryRow partition, int partId, BinaryRow key, T value) @@ -207,6 +213,15 @@ public class GlobalIndexAssigner<T> implements Serializable { private final Map<BinaryRow, TreeMap<Integer, Integer>> stats = new HashMap<>(); + public void bootstrapBucket(BinaryRow part, int bucket) { + TreeMap<Integer, Integer> bucketMap = bucketMap(part); + Integer count = bucketMap.get(bucket); + if (count == null) { + count = 0; + } + bucketMap.put(bucket, count + 1); + } + public int assignBucket(BinaryRow part, Filter<Integer> filter, int maxCount) { TreeMap<Integer, Integer> bucketMap = bucketMap(part); for (Map.Entry<Integer, Integer> entry : bucketMap.entrySet()) { diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/GlobalIndexAssignerOperator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/GlobalIndexAssignerOperator.java index 5d5acd203..0c10b01ff 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/GlobalIndexAssignerOperator.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/GlobalIndexAssignerOperator.java @@ -18,56 +18,81 @@ package org.apache.paimon.flink.sink.index; -import org.apache.paimon.data.GenericRow; +import org.apache.paimon.CoreOptions; +import org.apache.paimon.data.InternalRow; +import org.apache.paimon.data.serializer.InternalRowSerializer; +import org.apache.paimon.disk.ExternalBuffer; +import org.apache.paimon.disk.IOManager; +import org.apache.paimon.flink.FlinkRowData; +import org.apache.paimon.flink.FlinkRowWrapper; import org.apache.paimon.flink.sink.RowDataPartitionKeyExtractor; -import org.apache.paimon.flink.sink.cdc.CdcRecord; -import org.apache.paimon.flink.sink.cdc.CdcRecordPartitionKeyExtractor; -import org.apache.paimon.flink.sink.cdc.CdcRecordUtils; import org.apache.paimon.flink.utils.ProjectToRowDataFunction; -import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.memory.HeapMemorySegmentPool; +import org.apache.paimon.options.Options; +import org.apache.paimon.table.AbstractFileStoreTable; import org.apache.paimon.table.Table; import org.apache.paimon.types.RowType; -import org.apache.paimon.utils.RowDataToObjectArrayConverter; +import org.apache.paimon.utils.SerializableFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.runtime.state.StateInitializationContext; import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.BoundedOneInput; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.table.data.RowData; import java.io.File; import java.io.IOException; -import java.util.HashMap; -import java.util.List; -import java.util.Map; import java.util.concurrent.ThreadLocalRandom; import static org.apache.paimon.flink.FlinkRowData.toFlinkRowKind; /** A {@link OneInputStreamOperator} for {@link GlobalIndexAssigner}. */ public class GlobalIndexAssignerOperator<T> extends AbstractStreamOperator<Tuple2<T, Integer>> - implements OneInputStreamOperator<Tuple2<KeyPartOrRow, T>, Tuple2<T, Integer>> { + implements OneInputStreamOperator<Tuple2<KeyPartOrRow, T>, Tuple2<T, Integer>>, + BoundedOneInput { private static final long serialVersionUID = 1L; + private final Table table; private final GlobalIndexAssigner<T> assigner; + private final SerializableFunction<T, InternalRow> toRow; + private final SerializableFunction<InternalRow, T> fromRow; - public GlobalIndexAssignerOperator(GlobalIndexAssigner<T> assigner) { + private transient ExternalBuffer bootstrapBuffer; + + public GlobalIndexAssignerOperator( + Table table, + GlobalIndexAssigner<T> assigner, + SerializableFunction<T, InternalRow> toRow, + SerializableFunction<InternalRow, T> fromRow) { + this.table = table; this.assigner = assigner; + this.toRow = toRow; + this.fromRow = fromRow; } @Override public void initializeState(StateInitializationContext context) throws Exception { super.initializeState(context); - File[] tmpDirs = - getContainingTask().getEnvironment().getIOManager().getSpillingDirectories(); + org.apache.flink.runtime.io.disk.iomanager.IOManager ioManager = + getContainingTask().getEnvironment().getIOManager(); + File[] tmpDirs = ioManager.getSpillingDirectories(); File tmpDir = tmpDirs[ThreadLocalRandom.current().nextInt(tmpDirs.length)]; assigner.open( tmpDir, getRuntimeContext().getNumberOfParallelSubtasks(), getRuntimeContext().getIndexOfThisSubtask(), this::collect); + Options options = Options.fromMap(table.options()); + long bufferSize = options.get(CoreOptions.WRITE_BUFFER_SIZE).getBytes(); + long pageSize = options.get(CoreOptions.PAGE_SIZE).getBytes(); + bootstrapBuffer = + new ExternalBuffer( + IOManager.create(ioManager.getSpillingDirectoriesPaths()), + new HeapMemorySegmentPool(bufferSize, (int) pageSize), + new InternalRowSerializer(table.rowType())); } @Override @@ -80,11 +105,38 @@ public class GlobalIndexAssignerOperator<T> extends AbstractStreamOperator<Tuple assigner.bootstrap(value); break; case ROW: - assigner.process(value); + if (bootstrapBuffer != null) { + bootstrapBuffer.add(toRow.apply(value)); + } else { + assigner.process(value); + } break; } } + @Override + public void prepareSnapshotPreBarrier(long checkpointId) throws Exception { + endBootstrap(); + } + + @Override + public void endInput() throws Exception { + endBootstrap(); + } + + private void endBootstrap() throws Exception { + if (bootstrapBuffer != null) { + bootstrapBuffer.complete(); + try (ExternalBuffer.BufferIterator iterator = bootstrapBuffer.newIterator()) { + while (iterator.advanceNext()) { + assigner.process(fromRow.apply(iterator.getRow())); + } + } + bootstrapBuffer.reset(); + bootstrapBuffer = null; + } + } + private void collect(T value, int bucket) { output.collect(new StreamRecord<>(new Tuple2<>(value, bucket))); } @@ -95,39 +147,22 @@ public class GlobalIndexAssignerOperator<T> extends AbstractStreamOperator<Tuple } public static GlobalIndexAssignerOperator<RowData> forRowData(Table table) { - return new GlobalIndexAssignerOperator<>(createRowDataAssigner(table)); + return new GlobalIndexAssignerOperator<>( + table, createRowDataAssigner(table), FlinkRowWrapper::new, FlinkRowData::new); } public static GlobalIndexAssigner<RowData> createRowDataAssigner(Table t) { + RowType bootstrapType = IndexBootstrap.bootstrapType(((AbstractFileStoreTable) t).schema()); + int bucketIndex = bootstrapType.getFieldCount() - 1; return new GlobalIndexAssigner<>( t, RowDataPartitionKeyExtractor::new, KeyPartPartitionKeyExtractor::new, + row -> row.getInt(bucketIndex), new ProjectToRowDataFunction(t.rowType(), t.partitionKeys()), (rowData, rowKind) -> { rowData.setRowKind(toFlinkRowKind(rowKind)); return rowData; }); } - - public static GlobalIndexAssignerOperator<CdcRecord> forCdcRecord(Table table) { - RowType partitionType = ((FileStoreTable) table).schema().logicalPartitionType(); - List<String> partitionNames = partitionType.getFieldNames(); - RowDataToObjectArrayConverter converter = new RowDataToObjectArrayConverter(partitionType); - GlobalIndexAssigner<CdcRecord> assigner = - new GlobalIndexAssigner<>( - table, - CdcRecordPartitionKeyExtractor::new, - CdcRecordPartitionKeyExtractor::new, - (record, part) -> { - CdcRecord partCdc = - CdcRecordUtils.fromGenericRow( - GenericRow.of(converter.convert(part)), partitionNames); - Map<String, String> fields = new HashMap<>(record.fields()); - fields.putAll(partCdc.fields()); - return new CdcRecord(record.kind(), fields); - }, - CdcRecord::setRowKind); - return new GlobalIndexAssignerOperator<>(assigner); - } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/IndexBootstrap.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/IndexBootstrap.java index 2df812007..95b4c1a94 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/IndexBootstrap.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/IndexBootstrap.java @@ -18,26 +18,40 @@ package org.apache.paimon.flink.sink.index; +import org.apache.paimon.data.GenericRow; import org.apache.paimon.data.InternalRow; +import org.apache.paimon.data.JoinedRow; import org.apache.paimon.reader.RecordReader; +import org.apache.paimon.schema.TableSchema; import org.apache.paimon.table.Table; import org.apache.paimon.table.source.AbstractInnerTableScan; +import org.apache.paimon.table.source.DataSplit; import org.apache.paimon.table.source.ReadBuilder; +import org.apache.paimon.table.source.Split; import org.apache.paimon.table.source.TableScan; +import org.apache.paimon.types.DataField; +import org.apache.paimon.types.DataTypes; import org.apache.paimon.types.RowType; import java.io.IOException; import java.io.Serializable; +import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.function.Consumer; import java.util.stream.Collectors; import java.util.stream.Stream; +import static org.apache.paimon.CoreOptions.SCAN_MODE; +import static org.apache.paimon.CoreOptions.StartupMode.LATEST; + /** Bootstrap key index from Paimon table. */ public class IndexBootstrap implements Serializable { private static final long serialVersionUID = 1L; + public static final String BUCKET_FIELD = "_BUCKET"; + private final Table table; public IndexBootstrap(Table table) { @@ -56,14 +70,42 @@ public class IndexBootstrap implements Serializable { .map(fieldNames::indexOf) .mapToInt(Integer::intValue) .toArray(); - ReadBuilder readBuilder = table.newReadBuilder().withProjection(projection); + + // force using the latest scan mode + ReadBuilder readBuilder = + table.copy(Collections.singletonMap(SCAN_MODE.key(), LATEST.toString())) + .newReadBuilder() + .withProjection(projection); AbstractInnerTableScan tableScan = (AbstractInnerTableScan) readBuilder.newScan(); TableScan.Plan plan = tableScan.withBucketFilter(bucket -> bucket % numAssigners == assignId).plan(); - try (RecordReader<InternalRow> reader = readBuilder.newRead().createReader(plan)) { - reader.forEachRemaining(collector); + for (Split split : plan.splits()) { + try (RecordReader<InternalRow> reader = readBuilder.newRead().createReader(split)) { + int bucket = ((DataSplit) split).bucket(); + GenericRow bucketRow = GenericRow.of(bucket); + JoinedRow joinedRow = new JoinedRow(); + reader.transform(row -> joinedRow.replace(row, bucketRow)) + .forEachRemaining(collector); + } } } + + public static RowType bootstrapType(TableSchema schema) { + List<String> primaryKeys = schema.primaryKeys(); + List<String> partitionKeys = schema.partitionKeys(); + List<DataField> bootstrapFields = + new ArrayList<>( + schema.projectedLogicalRowType( + Stream.concat(primaryKeys.stream(), partitionKeys.stream()) + .collect(Collectors.toList())) + .getFields()); + bootstrapFields.add( + new DataField( + RowType.currentHighestFieldId(bootstrapFields) + 1, + BUCKET_FIELD, + DataTypes.INT().notNull())); + return new RowType(bootstrapFields); + } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/KeyPartRowChannelComputer.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/KeyPartRowChannelComputer.java index 9535b3826..aff593007 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/KeyPartRowChannelComputer.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/KeyPartRowChannelComputer.java @@ -45,7 +45,6 @@ public class KeyPartRowChannelComputer implements ChannelComputer<Tuple2<KeyPart public KeyPartRowChannelComputer( RowType rowType, RowType keyPartType, List<String> primaryKey) { - this.rowType = rowType; this.keyPartType = keyPartType; this.primaryKey = primaryKey; @@ -65,4 +64,9 @@ public class KeyPartRowChannelComputer implements ChannelComputer<Tuple2<KeyPart .apply(new FlinkRowWrapper(record.f1)); return Math.abs(key.hashCode() % numChannels); } + + @Override + public String toString() { + return "shuffle by key hash"; + } } diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/GlobalDynamicBucketTableITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/GlobalDynamicBucketTableITCase.java index 65c608ba4..794cb312f 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/GlobalDynamicBucketTableITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/GlobalDynamicBucketTableITCase.java @@ -84,4 +84,23 @@ public class GlobalDynamicBucketTableITCase extends CatalogITCaseBase { assertThat(sql("SELECT DISTINCT bucket FROM T$files")) .containsExactlyInAnyOrder(Row.of(0), Row.of(1), Row.of(2)); } + + @Test + public void testLargeRecords() { + sql( + "create table large_t (pt int, k int, v int, primary key (k) not enforced) partitioned by (pt) with (" + + "'bucket'='-1', " + + "'dynamic-bucket.target-row-num'='10000')"); + sql( + "create temporary table src (pt int, k int, v int) with (" + + "'connector'='datagen', " + + "'number-of-rows'='100000', " + + "'fields.k.min'='0', " + + "'fields.k.max'='100000', " + + "'fields.pt.min'='0', " + + "'fields.pt.max'='1')"); + sql("insert into large_t select * from src"); + sql("insert into large_t select * from src"); + assertThat(sql("select k, count(*) from large_t group by k having count(*) > 1")).isEmpty(); + } } diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncTableSinkITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncTableSinkITCase.java index 81c9d1495..346e48b37 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncTableSinkITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncTableSinkITCase.java @@ -46,6 +46,7 @@ import org.apache.paimon.utils.TraceableFileIO; import org.apache.flink.api.common.restartstrategy.RestartStrategies; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; import org.junit.jupiter.api.io.TempDir; @@ -76,6 +77,7 @@ public class FlinkCdcSyncTableSinkITCase extends AbstractTestBase { innerTestRandomCdcEvents(-1, false); } + @Disabled @Test @Timeout(120) public void testRandomCdcEventsGlobalDynamicBucket() throws Exception { diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/index/IndexBootstrapTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/index/IndexBootstrapTest.java index cd185d1de..aadec017e 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/index/IndexBootstrapTest.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/index/IndexBootstrapTest.java @@ -65,15 +65,24 @@ public class IndexBootstrapTest extends TableTestBase { GenericRow.of(7, 7)); IndexBootstrap indexBootstrap = new IndexBootstrap(table); - List<Integer> result = new ArrayList<>(); - Consumer<InternalRow> consumer = row -> result.add(row.getInt(0)); + List<GenericRow> result = new ArrayList<>(); + Consumer<InternalRow> consumer = + row -> result.add(GenericRow.of(row.getInt(0), row.getInt(1))); + + // output key and bucket indexBootstrap.bootstrap(2, 0, consumer); - assertThat(result).containsExactlyInAnyOrder(2, 3); + assertThat(result).containsExactlyInAnyOrder(GenericRow.of(2, 4), GenericRow.of(3, 0)); result.clear(); indexBootstrap.bootstrap(2, 1, consumer); - assertThat(result).containsExactlyInAnyOrder(1, 4, 5, 6, 7); + assertThat(result) + .containsExactlyInAnyOrder( + GenericRow.of(1, 3), + GenericRow.of(4, 1), + GenericRow.of(5, 1), + GenericRow.of(6, 3), + GenericRow.of(7, 1)); result.clear(); } }
