This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 4fbfa1baf [flink] Fix global dynamic bucket mode (#1914)
4fbfa1baf is described below
commit 4fbfa1baf9fc6c9b64da2b9cd5ae731cc75d1671
Author: Jingsong Lee <[email protected]>
AuthorDate: Thu Aug 31 19:07:37 2023 +0800
[flink] Fix global dynamic bucket mode (#1914)
---
.../data/serializer/InternalRowSerializer.java | 2 +-
.../paimon/disk/BufferFileReaderInputView.java | 99 ++++++
.../org/apache/paimon/disk/ExternalBuffer.java | 383 +++++++++++++++++++++
.../org/apache/paimon/disk/ExternalBufferTest.java | 209 +++++++++++
.../paimon/flink/sink/cdc/CdcSinkBuilder.java | 4 -
.../sink/cdc/FlinkCdcSyncDatabaseSinkBuilder.java | 3 -
.../flink/sink/index/GlobalDynamicBucketSink.java | 16 +-
.../sink/index/GlobalDynamicCdcBucketSink.java | 133 -------
.../flink/sink/index/GlobalIndexAssigner.java | 29 +-
.../sink/index/GlobalIndexAssignerOperator.java | 107 ++++--
.../paimon/flink/sink/index/IndexBootstrap.java | 48 ++-
.../sink/index/KeyPartRowChannelComputer.java | 6 +-
.../flink/GlobalDynamicBucketTableITCase.java | 19 +
.../sink/cdc/FlinkCdcSyncTableSinkITCase.java | 2 +
.../flink/sink/index/IndexBootstrapTest.java | 17 +-
15 files changed, 875 insertions(+), 202 deletions(-)
diff --git
a/paimon-common/src/main/java/org/apache/paimon/data/serializer/InternalRowSerializer.java
b/paimon-common/src/main/java/org/apache/paimon/data/serializer/InternalRowSerializer.java
index 199d08b67..ab2ff7c8b 100644
---
a/paimon-common/src/main/java/org/apache/paimon/data/serializer/InternalRowSerializer.java
+++
b/paimon-common/src/main/java/org/apache/paimon/data/serializer/InternalRowSerializer.java
@@ -72,7 +72,7 @@ public class InternalRowSerializer extends
AbstractRowDataSerializer<InternalRow
}
@Override
- public Serializer<InternalRow> duplicate() {
+ public InternalRowSerializer duplicate() {
Serializer<?>[] duplicateFieldSerializers = new
Serializer[fieldSerializers.length];
for (int i = 0; i < fieldSerializers.length; i++) {
duplicateFieldSerializers[i] = fieldSerializers[i].duplicate();
diff --git
a/paimon-core/src/main/java/org/apache/paimon/disk/BufferFileReaderInputView.java
b/paimon-core/src/main/java/org/apache/paimon/disk/BufferFileReaderInputView.java
new file mode 100644
index 000000000..ab91b8944
--- /dev/null
+++
b/paimon-core/src/main/java/org/apache/paimon/disk/BufferFileReaderInputView.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.disk;
+
+import org.apache.paimon.data.AbstractPagedInputView;
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.serializer.BinaryRowSerializer;
+import org.apache.paimon.memory.Buffer;
+import org.apache.paimon.memory.MemorySegment;
+import org.apache.paimon.utils.MutableObjectIterator;
+
+import java.io.EOFException;
+import java.io.IOException;
+
+/** An {@link AbstractPagedInputView} which reads blocks from channel without
compression. */
+public class BufferFileReaderInputView extends AbstractPagedInputView {
+
+ private final BufferFileReader reader;
+ private final MemorySegment segment;
+
+ private int currentSegmentLimit;
+
+ public BufferFileReaderInputView(FileIOChannel.ID id, IOManager ioManager,
int segmentSize)
+ throws IOException {
+ this.reader = ioManager.createBufferFileReader(id);
+ this.segment = MemorySegment.wrap(new byte[segmentSize]);
+ }
+
+ @Override
+ protected MemorySegment nextSegment(MemorySegment current) throws
IOException {
+ if (reader.hasReachedEndOfFile()) {
+ throw new EOFException();
+ }
+
+ Buffer buffer = Buffer.create(segment);
+ reader.readInto(buffer);
+ this.currentSegmentLimit = buffer.getSize();
+ return segment;
+ }
+
+ @Override
+ protected int getLimitForSegment(MemorySegment segment) {
+ return currentSegmentLimit;
+ }
+
+ public void close() throws IOException {
+ reader.close();
+ }
+
+ public FileIOChannel getChannel() {
+ return reader;
+ }
+
+ public MutableObjectIterator<BinaryRow> createBinaryRowIterator(
+ BinaryRowSerializer serializer) {
+ return new BinaryRowChannelInputViewIterator(serializer);
+ }
+
+ private class BinaryRowChannelInputViewIterator implements
MutableObjectIterator<BinaryRow> {
+
+ protected final BinaryRowSerializer serializer;
+
+ public BinaryRowChannelInputViewIterator(BinaryRowSerializer
serializer) {
+ this.serializer = serializer;
+ }
+
+ @Override
+ public BinaryRow next(BinaryRow reuse) throws IOException {
+ try {
+ return this.serializer.deserializeFromPages(reuse,
BufferFileReaderInputView.this);
+ } catch (EOFException e) {
+ close();
+ return null;
+ }
+ }
+
+ @Override
+ public BinaryRow next() throws IOException {
+ throw new UnsupportedOperationException(
+ "This method is disabled due to performance issue!");
+ }
+ }
+}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/disk/ExternalBuffer.java
b/paimon-core/src/main/java/org/apache/paimon/disk/ExternalBuffer.java
new file mode 100644
index 000000000..9ced8e83d
--- /dev/null
+++ b/paimon-core/src/main/java/org/apache/paimon/disk/ExternalBuffer.java
@@ -0,0 +1,383 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.disk;
+
+import org.apache.paimon.annotation.VisibleForTesting;
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.RandomAccessInputView;
+import org.apache.paimon.data.SimpleCollectingOutputView;
+import org.apache.paimon.data.serializer.AbstractRowDataSerializer;
+import org.apache.paimon.data.serializer.BinaryRowSerializer;
+import org.apache.paimon.memory.Buffer;
+import org.apache.paimon.memory.MemorySegment;
+import org.apache.paimon.memory.MemorySegmentPool;
+import org.apache.paimon.utils.MutableObjectIterator;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.EOFException;
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.paimon.utils.Preconditions.checkState;
+
+/** An external buffer for storing rows, it will spill the data to disk when
the memory is full. */
+public class ExternalBuffer {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(ExternalBuffer.class);
+
+ private final IOManager ioManager;
+ private final MemorySegmentPool pool;
+ private final BinaryRowSerializer binaryRowSerializer;
+ private final InMemoryBuffer inMemoryBuffer;
+
+ // The size of each segment
+ private final int segmentSize;
+
+ private final List<ChannelWithMeta> spilledChannelIDs;
+ private int numRows;
+
+ private boolean addCompleted;
+
+ public ExternalBuffer(
+ IOManager ioManager, MemorySegmentPool pool,
AbstractRowDataSerializer<?> serializer) {
+ this.ioManager = ioManager;
+ this.pool = pool;
+
+ this.binaryRowSerializer =
+ serializer instanceof BinaryRowSerializer
+ ? (BinaryRowSerializer) serializer.duplicate()
+ : new BinaryRowSerializer(serializer.getArity());
+
+ this.segmentSize = pool.pageSize();
+
+ this.spilledChannelIDs = new ArrayList<>();
+
+ this.numRows = 0;
+
+ this.addCompleted = false;
+
+ //noinspection unchecked
+ this.inMemoryBuffer =
+ new InMemoryBuffer((AbstractRowDataSerializer<InternalRow>)
serializer);
+ }
+
+ public void reset() {
+ clearChannels();
+ inMemoryBuffer.reset();
+ numRows = 0;
+ addCompleted = false;
+ }
+
+ public void add(InternalRow row) throws IOException {
+ checkState(!addCompleted, "This buffer has add completed.");
+ if (!inMemoryBuffer.write(row)) {
+ // Check if record is too big.
+ if (inMemoryBuffer.getCurrentDataBufferOffset() == 0) {
+ throwTooBigException(row);
+ }
+ spill();
+ if (!inMemoryBuffer.write(row)) {
+ throwTooBigException(row);
+ }
+ }
+
+ numRows++;
+ }
+
+ public void complete() {
+ addCompleted = true;
+ }
+
+ public BufferIterator newIterator() {
+ checkState(addCompleted, "This buffer has not add completed.");
+ return new BufferIterator();
+ }
+
+ private void throwTooBigException(InternalRow row) throws IOException {
+ int rowSize =
inMemoryBuffer.serializer.toBinaryRow(row).toBytes().length;
+ throw new IOException(
+ "Record is too big, it can't be added to a empty
InMemoryBuffer! "
+ + "Record size: "
+ + rowSize
+ + ", Buffer: "
+ + memorySize());
+ }
+
+ private void spill() throws IOException {
+ FileIOChannel.ID channel = ioManager.createChannel();
+
+ BufferFileWriter writer = ioManager.createBufferFileWriter(channel);
+ int numRecordBuffers = inMemoryBuffer.getNumRecordBuffers();
+ ArrayList<MemorySegment> segments =
inMemoryBuffer.getRecordBufferSegments();
+ try {
+ // spill in memory buffer in zero-copy.
+ for (int i = 0; i < numRecordBuffers; i++) {
+ MemorySegment segment = segments.get(i);
+ int bufferSize =
+ i == numRecordBuffers - 1
+ ? inMemoryBuffer.getNumBytesInLastBuffer()
+ : segment.size();
+ writer.writeBlock(Buffer.create(segment, bufferSize));
+ }
+ LOG.info(
+ "here spill the reset buffer data with {} records {}
bytes",
+ inMemoryBuffer.numRecords,
+ writer.getSize());
+ writer.close();
+ } catch (IOException e) {
+ writer.closeAndDelete();
+ throw e;
+ }
+
+ spilledChannelIDs.add(
+ new ChannelWithMeta(
+ channel,
+ inMemoryBuffer.getNumRecordBuffers(),
+ inMemoryBuffer.getNumBytesInLastBuffer()));
+
+ inMemoryBuffer.reset();
+ }
+
+ public int size() {
+ return numRows;
+ }
+
+ private int memorySize() {
+ return pool.freePages() * segmentSize;
+ }
+
+ @SuppressWarnings("ResultOfMethodCallIgnored")
+ private void clearChannels() {
+ for (ChannelWithMeta meta : spilledChannelIDs) {
+ final File f = new File(meta.getChannel().getPath());
+ if (f.exists()) {
+ f.delete();
+ }
+ }
+ spilledChannelIDs.clear();
+ }
+
+ /** Iterator of external buffer. */
+ public class BufferIterator implements Closeable {
+
+ private MutableObjectIterator<BinaryRow> currentIterator;
+ private final BinaryRow reuse = binaryRowSerializer.createInstance();
+
+ private int currentChannelID = -1;
+ private BinaryRow row;
+ private boolean closed;
+ private BufferFileReaderInputView channelReader;
+
+ private BufferIterator() {
+ this.closed = false;
+ }
+
+ private void checkValidity() {
+ if (closed) {
+ throw new RuntimeException("This iterator is closed!");
+ }
+ }
+
+ @Override
+ public void close() {
+ if (closed) {
+ return;
+ }
+
+ try {
+ closeCurrentFileReader();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+
+ closed = true;
+ }
+
+ public boolean advanceNext() {
+ checkValidity();
+
+ try {
+ // get from curr iterator or new iterator.
+ while (true) {
+ if (currentIterator != null && (row =
currentIterator.next(reuse)) != null) {
+ return true;
+ } else {
+ if (!nextIterator()) {
+ return false;
+ }
+ }
+ }
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private boolean nextIterator() throws IOException {
+ if (currentChannelID == Integer.MAX_VALUE) {
+ return false;
+ } else if (currentChannelID < spilledChannelIDs.size() - 1) {
+ nextSpilledIterator();
+ } else {
+ newMemoryIterator();
+ }
+ return true;
+ }
+
+ public BinaryRow getRow() {
+ return row;
+ }
+
+ private void closeCurrentFileReader() throws IOException {
+ if (channelReader != null) {
+ channelReader.close();
+ channelReader = null;
+ }
+ }
+
+ private void nextSpilledIterator() throws IOException {
+ ChannelWithMeta channel = spilledChannelIDs.get(currentChannelID +
1);
+ currentChannelID++;
+
+ // close current reader first.
+ closeCurrentFileReader();
+
+ // new reader.
+ this.channelReader =
+ new BufferFileReaderInputView(channel.getChannel(),
ioManager, segmentSize);
+ this.currentIterator =
channelReader.createBinaryRowIterator(binaryRowSerializer);
+ }
+
+ private void newMemoryIterator() {
+ this.currentChannelID = Integer.MAX_VALUE;
+ this.currentIterator = inMemoryBuffer.newIterator();
+ }
+ }
+
+ @VisibleForTesting
+ List<ChannelWithMeta> getSpillChannels() {
+ return spilledChannelIDs;
+ }
+
+ private class InMemoryBuffer {
+
+ private final AbstractRowDataSerializer<InternalRow> serializer;
+ private final ArrayList<MemorySegment> recordBufferSegments;
+ private final SimpleCollectingOutputView recordCollector;
+
+ private long currentDataBufferOffset;
+ private int numBytesInLastBuffer;
+ private int numRecords = 0;
+
+ private InMemoryBuffer(AbstractRowDataSerializer<InternalRow>
serializer) {
+ // serializer has states, so we must duplicate
+ this.serializer = (AbstractRowDataSerializer<InternalRow>)
serializer.duplicate();
+ this.recordBufferSegments = new ArrayList<>();
+ this.recordCollector =
+ new SimpleCollectingOutputView(this.recordBufferSegments,
pool, segmentSize);
+ }
+
+ private void reset() {
+ this.currentDataBufferOffset = 0;
+ this.numRecords = 0;
+ returnToSegmentPool();
+ this.recordCollector.reset();
+ }
+
+ private void returnToSegmentPool() {
+ pool.returnAll(this.recordBufferSegments);
+ this.recordBufferSegments.clear();
+ }
+
+ public boolean write(InternalRow row) throws IOException {
+ try {
+ this.serializer.serializeToPages(row, this.recordCollector);
+ currentDataBufferOffset =
this.recordCollector.getCurrentOffset();
+ numBytesInLastBuffer =
this.recordCollector.getCurrentPositionInSegment();
+ numRecords++;
+ return true;
+ } catch (EOFException e) {
+ return false;
+ }
+ }
+
+ private ArrayList<MemorySegment> getRecordBufferSegments() {
+ return recordBufferSegments;
+ }
+
+ private long getCurrentDataBufferOffset() {
+ return currentDataBufferOffset;
+ }
+
+ private int getNumRecordBuffers() {
+ int result = (int) (currentDataBufferOffset / segmentSize);
+ long mod = currentDataBufferOffset % segmentSize;
+ if (mod != 0) {
+ result += 1;
+ }
+ return result;
+ }
+
+ private int getNumBytesInLastBuffer() {
+ return numBytesInLastBuffer;
+ }
+
+ private InMemoryBufferIterator newIterator() {
+ RandomAccessInputView recordBuffer =
+ new RandomAccessInputView(
+ this.recordBufferSegments, segmentSize,
numBytesInLastBuffer);
+ return new InMemoryBufferIterator(recordBuffer, serializer);
+ }
+ }
+
+ private static class InMemoryBufferIterator
+ implements MutableObjectIterator<BinaryRow>, Closeable {
+
+ private final RandomAccessInputView recordBuffer;
+ private final AbstractRowDataSerializer<InternalRow> serializer;
+
+ private InMemoryBufferIterator(
+ RandomAccessInputView recordBuffer,
+ AbstractRowDataSerializer<InternalRow> serializer) {
+ this.recordBuffer = recordBuffer;
+ this.serializer = serializer;
+ }
+
+ @Override
+ public BinaryRow next(BinaryRow reuse) throws IOException {
+ try {
+ return (BinaryRow) serializer.mapFromPages(reuse,
recordBuffer);
+ } catch (EOFException e) {
+ return null;
+ }
+ }
+
+ @Override
+ public BinaryRow next() throws IOException {
+ throw new RuntimeException("Not support!");
+ }
+
+ @Override
+ public void close() {}
+ }
+}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/disk/ExternalBufferTest.java
b/paimon-core/src/test/java/org/apache/paimon/disk/ExternalBufferTest.java
new file mode 100644
index 000000000..5bd9ad511
--- /dev/null
+++ b/paimon-core/src/test/java/org/apache/paimon/disk/ExternalBufferTest.java
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.disk;
+
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.BinaryRowWriter;
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.serializer.BinaryRowSerializer;
+import org.apache.paimon.memory.HeapMemorySegmentPool;
+
+import org.apache.commons.math3.random.RandomDataGenerator;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.IOException;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+
+import static org.apache.paimon.memory.MemorySegmentPool.DEFAULT_PAGE_SIZE;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Test for {@link ExternalBuffer}. */
+public class ExternalBufferTest {
+
+ @TempDir Path tempDir;
+
+ private IOManager ioManager;
+ private Random random;
+ private BinaryRowSerializer serializer;
+
+ @BeforeEach
+ public void before() {
+ this.ioManager = IOManager.create(tempDir.toString());
+ this.random = new Random();
+ this.serializer = new BinaryRowSerializer(1);
+ }
+
+ private ExternalBuffer newBuffer() {
+ return new ExternalBuffer(
+ ioManager,
+ new HeapMemorySegmentPool(2 * DEFAULT_PAGE_SIZE,
DEFAULT_PAGE_SIZE),
+ this.serializer);
+ }
+
+ @Test
+ public void testLess() throws Exception {
+ ExternalBuffer buffer = newBuffer();
+
+ int number = 100;
+ List<Long> expected = insertMulti(buffer, number);
+ assertThat(number).isEqualTo(buffer.size());
+ assertBuffer(expected, buffer);
+ assertThat(0).isEqualTo(buffer.getSpillChannels().size());
+
+ // repeat read
+ assertBuffer(expected, buffer);
+ buffer.newIterator();
+ assertBuffer(expected, buffer);
+ buffer.reset();
+ }
+
+ @Test
+ public void testSpill() throws Exception {
+ ExternalBuffer buffer = newBuffer();
+
+ int number = 5000; // 16 * 5000
+ List<Long> expected = insertMulti(buffer, number);
+ assertThat(number).isEqualTo(buffer.size());
+ assertBuffer(expected, buffer);
+ assertThat(buffer.getSpillChannels().size()).isGreaterThan(0);
+
+ // repeat read
+ assertBuffer(expected, buffer);
+ buffer.newIterator();
+ assertBuffer(expected, buffer);
+ buffer.reset();
+ }
+
+ @Test
+ public void testBufferReset() throws Exception {
+ ExternalBuffer buffer = newBuffer();
+
+ // less
+ insertMulti(buffer, 10);
+ buffer.reset();
+ assertThat(0).isEqualTo(buffer.size());
+
+ // not spill
+ List<Long> expected = insertMulti(buffer, 100);
+ assertThat(100).isEqualTo(buffer.size());
+ assertBuffer(expected, buffer);
+ buffer.reset();
+
+ // spill
+ expected = insertMulti(buffer, 2500);
+ assertThat(2500).isEqualTo(buffer.size());
+ assertBuffer(expected, buffer);
+ buffer.reset();
+ }
+
+ @Test
+ public void testBufferResetWithSpill() throws Exception {
+ int inMemoryThreshold = 20;
+ ExternalBuffer buffer = newBuffer();
+
+ // spill
+ List<Long> expected = insertMulti(buffer, 5000);
+ assertThat(5000).isEqualTo(buffer.size());
+ assertBuffer(expected, buffer);
+ buffer.reset();
+
+ // spill, but not read the values
+ insertMulti(buffer, 5000);
+ buffer.newIterator();
+ assertThat(5000).isEqualTo(buffer.size());
+ buffer.reset();
+
+ // not spill
+ expected = insertMulti(buffer, inMemoryThreshold / 2);
+ assertBuffer(expected, buffer);
+ buffer.reset();
+ assertThat(0).isEqualTo(buffer.size());
+
+ // less
+ expected = insertMulti(buffer, 100);
+ assertThat(100).isEqualTo(buffer.size());
+ assertBuffer(expected, buffer);
+ buffer.reset();
+ }
+
+ @Test
+ public void testHugeRecord() {
+ ExternalBuffer buffer =
+ new ExternalBuffer(
+ ioManager,
+ new HeapMemorySegmentPool(3 * DEFAULT_PAGE_SIZE,
DEFAULT_PAGE_SIZE),
+ new BinaryRowSerializer(1));
+ assertThatThrownBy(() ->
writeHuge(buffer)).isInstanceOf(IOException.class);
+ buffer.reset();
+ }
+
+ private void writeHuge(ExternalBuffer buffer) throws IOException {
+ BinaryRow row = new BinaryRow(1);
+ BinaryRowWriter writer = new BinaryRowWriter(row);
+ writer.reset();
+ RandomDataGenerator random = new RandomDataGenerator();
+ writer.writeString(0,
BinaryString.fromString(random.nextHexString(500000)));
+ writer.complete();
+ buffer.add(row);
+ }
+
+ private void assertBuffer(List<Long> expected, ExternalBuffer buffer) {
+ ExternalBuffer.BufferIterator iterator = buffer.newIterator();
+ assertBuffer(expected, iterator);
+ iterator.close();
+ }
+
+ private void assertBuffer(List<Long> expected,
ExternalBuffer.BufferIterator iterator) {
+ List<Long> values = new ArrayList<>();
+ while (iterator.advanceNext()) {
+ values.add(iterator.getRow().getLong(0));
+ }
+ assertThat(values).isEqualTo(expected);
+ }
+
+ private List<Long> insertMulti(ExternalBuffer buffer, int cnt) throws
IOException {
+ ArrayList<Long> expected = new ArrayList<>(cnt);
+ insertMulti(buffer, cnt, expected);
+ buffer.complete();
+ return expected;
+ }
+
+ private void insertMulti(ExternalBuffer buffer, int cnt, List<Long>
expected)
+ throws IOException {
+ for (int i = 0; i < cnt; i++) {
+ expected.add(randomInsert(buffer));
+ }
+ }
+
+ private long randomInsert(ExternalBuffer buffer) throws IOException {
+ long l = random.nextLong();
+ BinaryRow row = new BinaryRow(1);
+ BinaryRowWriter writer = new BinaryRowWriter(row);
+ writer.reset();
+ writer.writeLong(0, l);
+ writer.complete();
+ buffer.add(row);
+ return l;
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcSinkBuilder.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcSinkBuilder.java
index 4f1745406..1bda75695 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcSinkBuilder.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcSinkBuilder.java
@@ -21,7 +21,6 @@ package org.apache.paimon.flink.sink.cdc;
import org.apache.paimon.annotation.Experimental;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.Identifier;
-import org.apache.paimon.flink.sink.index.GlobalDynamicCdcBucketSink;
import org.apache.paimon.flink.utils.SingleOutputStreamOperatorUtils;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.table.BucketMode;
@@ -119,9 +118,6 @@ public class CdcSinkBuilder<T> {
return buildForFixedBucket(parsed);
case DYNAMIC:
return new CdcDynamicBucketSink((FileStoreTable)
table).build(parsed, parallelism);
- case GLOBAL_DYNAMIC:
- return new GlobalDynamicCdcBucketSink((FileStoreTable) table)
- .build(parsed, parallelism);
case UNAWARE:
default:
throw new UnsupportedOperationException("Unsupported bucket
mode: " + bucketMode);
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkBuilder.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkBuilder.java
index 3b775f700..552ec40ef 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkBuilder.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkBuilder.java
@@ -22,7 +22,6 @@ import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.flink.action.MultiTablesSinkMode;
import org.apache.paimon.flink.sink.FlinkStreamPartitioner;
-import org.apache.paimon.flink.sink.index.GlobalDynamicCdcBucketSink;
import org.apache.paimon.flink.utils.SingleOutputStreamOperatorUtils;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.table.BucketMode;
@@ -196,8 +195,6 @@ public class FlinkCdcSyncDatabaseSinkBuilder<T> {
new CdcDynamicBucketSink(table).build(parsedForTable,
parallelism);
break;
case GLOBAL_DYNAMIC:
- new
GlobalDynamicCdcBucketSink(table).build(parsedForTable, parallelism);
- break;
case UNAWARE:
default:
throw new UnsupportedOperationException(
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/GlobalDynamicBucketSink.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/GlobalDynamicBucketSink.java
index 5f98f3b05..26cfc9b41 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/GlobalDynamicBucketSink.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/GlobalDynamicBucketSink.java
@@ -43,11 +43,10 @@ import javax.annotation.Nullable;
import java.util.List;
import java.util.Map;
import java.util.UUID;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
import static org.apache.paimon.flink.LogicalTypeConversion.toLogicalType;
import static org.apache.paimon.flink.sink.FlinkStreamPartitioner.partition;
+import static org.apache.paimon.flink.sink.index.IndexBootstrap.bootstrapType;
/** Sink for global dynamic bucket table. */
public class GlobalDynamicBucketSink extends FlinkWriteSink<Tuple2<RowData,
Integer>> {
@@ -71,13 +70,10 @@ public class GlobalDynamicBucketSink extends
FlinkWriteSink<Tuple2<RowData, Inte
TableSchema schema = table.schema();
RowType rowType = schema.logicalRowType();
List<String> primaryKeys = schema.primaryKeys();
- List<String> partitionKeys = schema.partitionKeys();
RowDataSerializer rowSerializer = new
RowDataSerializer(toLogicalType(rowType));
- RowType keyPartType =
- schema.projectedLogicalRowType(
- Stream.concat(primaryKeys.stream(),
partitionKeys.stream())
- .collect(Collectors.toList()));
- RowDataSerializer keyPartSerializer = new
RowDataSerializer(toLogicalType(keyPartType));
+
+ RowType bootstrapType = bootstrapType(schema);
+ RowDataSerializer bootstrapSerializer = new
RowDataSerializer(toLogicalType(bootstrapType));
// Topology:
// input -- bootstrap -- shuffle by key hash --> bucket-assigner --
shuffle by bucket -->
@@ -88,7 +84,7 @@ public class GlobalDynamicBucketSink extends
FlinkWriteSink<Tuple2<RowData, Inte
"INDEX_BOOTSTRAP",
new InternalTypeInfo<>(
new KeyWithRowSerializer<>(
- keyPartSerializer,
rowSerializer)),
+ bootstrapSerializer,
rowSerializer)),
new IndexBootstrapOperator<>(
new IndexBootstrap(table),
FlinkRowData::new))
.setParallelism(input.getParallelism());
@@ -100,7 +96,7 @@ public class GlobalDynamicBucketSink extends
FlinkWriteSink<Tuple2<RowData, Inte
}
KeyPartRowChannelComputer channelComputer =
- new KeyPartRowChannelComputer(rowType, keyPartType,
primaryKeys);
+ new KeyPartRowChannelComputer(rowType, bootstrapType,
primaryKeys);
DataStream<Tuple2<KeyPartOrRow, RowData>> partitionByKeyHash =
partition(bootstraped, channelComputer, assignerParallelism);
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/GlobalDynamicCdcBucketSink.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/GlobalDynamicCdcBucketSink.java
deleted file mode 100644
index 06bb5a147..000000000
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/GlobalDynamicCdcBucketSink.java
+++ /dev/null
@@ -1,133 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.flink.sink.index;
-
-import org.apache.paimon.data.GenericRow;
-import org.apache.paimon.flink.sink.ChannelComputer;
-import org.apache.paimon.flink.sink.Committable;
-import org.apache.paimon.flink.sink.FlinkWriteSink;
-import org.apache.paimon.flink.sink.StoreSinkWrite;
-import org.apache.paimon.flink.sink.cdc.CdcDynamicBucketWriteOperator;
-import org.apache.paimon.flink.sink.cdc.CdcHashKeyChannelComputer;
-import org.apache.paimon.flink.sink.cdc.CdcRecord;
-import org.apache.paimon.flink.sink.cdc.CdcRecordUtils;
-import org.apache.paimon.flink.sink.cdc.CdcWithBucketChannelComputer;
-import org.apache.paimon.schema.TableSchema;
-import org.apache.paimon.table.FileStoreTable;
-import org.apache.paimon.types.RowType;
-import org.apache.paimon.utils.RowDataToObjectArrayConverter;
-
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.typeutils.EnumTypeInfo;
-import org.apache.flink.api.java.typeutils.TupleTypeInfo;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.datastream.DataStreamSink;
-import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
-
-import javax.annotation.Nullable;
-
-import java.util.List;
-import java.util.UUID;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
-
-import static org.apache.paimon.flink.sink.FlinkStreamPartitioner.partition;
-
-/** Sink for global dynamic bucket table and {@link CdcRecord} inputs. */
-public class GlobalDynamicCdcBucketSink extends
FlinkWriteSink<Tuple2<CdcRecord, Integer>> {
-
- private static final long serialVersionUID = 1L;
-
- public GlobalDynamicCdcBucketSink(FileStoreTable table) {
- super(table, null);
- }
-
- @Override
- protected OneInputStreamOperator<Tuple2<CdcRecord, Integer>, Committable>
createWriteOperator(
- StoreSinkWrite.Provider writeProvider, String commitUser) {
- return new CdcDynamicBucketWriteOperator(table, writeProvider,
commitUser);
- }
-
- public DataStreamSink<?> build(DataStream<CdcRecord> input, @Nullable
Integer parallelism) {
- String initialCommitUser = UUID.randomUUID().toString();
-
- TableSchema schema = table.schema();
- List<String> primaryKeys = schema.primaryKeys();
- List<String> partitionKeys = schema.partitionKeys();
- RowType keyPartType =
- schema.projectedLogicalRowType(
- Stream.concat(primaryKeys.stream(),
partitionKeys.stream())
- .collect(Collectors.toList()));
- List<String> keyPartNames = keyPartType.getFieldNames();
- RowDataToObjectArrayConverter keyPartConverter =
- new RowDataToObjectArrayConverter(keyPartType);
-
- // Topology:
- // input -- bootstrap -- shuffle by key hash --> bucket-assigner --
shuffle by bucket -->
- // writer --> committer
-
- TupleTypeInfo<Tuple2<KeyPartOrRow, CdcRecord>> tuple2TupleType =
- new TupleTypeInfo<>(new EnumTypeInfo<>(KeyPartOrRow.class),
input.getType());
- DataStream<Tuple2<KeyPartOrRow, CdcRecord>> bootstraped =
- input.transform(
- "INDEX_BOOTSTRAP",
- tuple2TupleType,
- new IndexBootstrapOperator<>(
- new IndexBootstrap(table),
- row ->
- CdcRecordUtils.fromGenericRow(
- GenericRow.ofKind(
-
row.getRowKind(),
-
keyPartConverter.convert(row)),
- keyPartNames)))
- .setParallelism(input.getParallelism());
-
- // 1. shuffle by key hash
- Integer assignerParallelism =
table.coreOptions().dynamicBucketAssignerParallelism();
- if (assignerParallelism == null) {
- assignerParallelism = parallelism;
- }
-
- ChannelComputer<Tuple2<KeyPartOrRow, CdcRecord>> channelComputer =
- ChannelComputer.transform(
- new CdcHashKeyChannelComputer(schema), tuple2 ->
tuple2.f1);
- DataStream<Tuple2<KeyPartOrRow, CdcRecord>> partitionByKeyHash =
- partition(bootstraped, channelComputer, assignerParallelism);
-
- // 2. bucket-assigner
- TupleTypeInfo<Tuple2<CdcRecord, Integer>> rowWithBucketType =
- new TupleTypeInfo<>(input.getType(),
BasicTypeInfo.INT_TYPE_INFO);
- DataStream<Tuple2<CdcRecord, Integer>> bucketAssigned =
- partitionByKeyHash
- .transform(
- "dynamic-bucket-assigner",
- rowWithBucketType,
-
GlobalIndexAssignerOperator.forCdcRecord(table))
- .setParallelism(partitionByKeyHash.getParallelism());
-
- // 3. shuffle by bucket
-
- DataStream<Tuple2<CdcRecord, Integer>> partitionByBucket =
- partition(bucketAssigned, new
CdcWithBucketChannelComputer(schema), parallelism);
-
- // 4. writer and committer
- return sinkFrom(partitionByBucket, initialCommitUser);
- }
-}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/GlobalIndexAssigner.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/GlobalIndexAssigner.java
index 2d3be0a12..78340b246 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/GlobalIndexAssigner.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/GlobalIndexAssigner.java
@@ -57,7 +57,8 @@ public class GlobalIndexAssigner<T> implements Serializable {
private final AbstractFileStoreTable table;
private final SerializableFunction<TableSchema, PartitionKeyExtractor<T>>
extractorFunction;
private final SerializableFunction<TableSchema, PartitionKeyExtractor<T>>
- keyPartExtractorFunction;
+ bootstrapExtractorFunction;
+ private final SerializableFunction<T, Integer> bootstrapBucketFunction;
private final SerBiFunction<T, BinaryRow, T> setPartition;
private final SerBiFunction<T, RowKind, T> setRowKind;
@@ -78,12 +79,14 @@ public class GlobalIndexAssigner<T> implements Serializable
{
public GlobalIndexAssigner(
Table table,
SerializableFunction<TableSchema, PartitionKeyExtractor<T>>
extractorFunction,
- SerializableFunction<TableSchema, PartitionKeyExtractor<T>>
keyPartExtractorFunction,
+ SerializableFunction<TableSchema, PartitionKeyExtractor<T>>
bootstrapExtractorFunction,
+ SerializableFunction<T, Integer> bootstrapBucketFunction,
SerBiFunction<T, BinaryRow, T> setPartition,
SerBiFunction<T, RowKind, T> setRowKind) {
this.table = (AbstractFileStoreTable) table;
this.extractorFunction = extractorFunction;
- this.keyPartExtractorFunction = keyPartExtractorFunction;
+ this.bootstrapExtractorFunction = bootstrapExtractorFunction;
+ this.bootstrapBucketFunction = bootstrapBucketFunction;
this.setPartition = setPartition;
this.setRowKind = setRowKind;
}
@@ -97,7 +100,7 @@ public class GlobalIndexAssigner<T> implements Serializable {
CoreOptions coreOptions = table.coreOptions();
this.targetBucketRowNumber = (int)
coreOptions.dynamicBucketTargetRowNum();
this.extractor = extractorFunction.apply(table.schema());
- this.keyPartExtractor = keyPartExtractorFunction.apply(table.schema());
+ this.keyPartExtractor =
bootstrapExtractorFunction.apply(table.schema());
// state
Options options = coreOptions.toConfiguration();
@@ -164,9 +167,12 @@ public class GlobalIndexAssigner<T> implements
Serializable {
public void bootstrap(T value) throws IOException {
BinaryRow partition = keyPartExtractor.partition(value);
- keyIndex.put(
- keyPartExtractor.trimmedPrimaryKey(value),
- new PositiveIntInt(partMapping.index(partition),
assignBucket(partition)));
+ BinaryRow key = keyPartExtractor.trimmedPrimaryKey(value);
+ int partId = partMapping.index(partition);
+ int bucket = bootstrapBucketFunction.apply(value);
+ bucketAssigner.bootstrapBucket(partition, bucket);
+ PositiveIntInt partAndBucket = new PositiveIntInt(partId, bucket);
+ keyIndex.put(key, partAndBucket);
}
private void processNewRecord(BinaryRow partition, int partId, BinaryRow
key, T value)
@@ -207,6 +213,15 @@ public class GlobalIndexAssigner<T> implements
Serializable {
private final Map<BinaryRow, TreeMap<Integer, Integer>> stats = new
HashMap<>();
+ public void bootstrapBucket(BinaryRow part, int bucket) {
+ TreeMap<Integer, Integer> bucketMap = bucketMap(part);
+ Integer count = bucketMap.get(bucket);
+ if (count == null) {
+ count = 0;
+ }
+ bucketMap.put(bucket, count + 1);
+ }
+
public int assignBucket(BinaryRow part, Filter<Integer> filter, int
maxCount) {
TreeMap<Integer, Integer> bucketMap = bucketMap(part);
for (Map.Entry<Integer, Integer> entry : bucketMap.entrySet()) {
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/GlobalIndexAssignerOperator.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/GlobalIndexAssignerOperator.java
index 5d5acd203..0c10b01ff 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/GlobalIndexAssignerOperator.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/GlobalIndexAssignerOperator.java
@@ -18,56 +18,81 @@
package org.apache.paimon.flink.sink.index;
-import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.serializer.InternalRowSerializer;
+import org.apache.paimon.disk.ExternalBuffer;
+import org.apache.paimon.disk.IOManager;
+import org.apache.paimon.flink.FlinkRowData;
+import org.apache.paimon.flink.FlinkRowWrapper;
import org.apache.paimon.flink.sink.RowDataPartitionKeyExtractor;
-import org.apache.paimon.flink.sink.cdc.CdcRecord;
-import org.apache.paimon.flink.sink.cdc.CdcRecordPartitionKeyExtractor;
-import org.apache.paimon.flink.sink.cdc.CdcRecordUtils;
import org.apache.paimon.flink.utils.ProjectToRowDataFunction;
-import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.memory.HeapMemorySegmentPool;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.table.AbstractFileStoreTable;
import org.apache.paimon.table.Table;
import org.apache.paimon.types.RowType;
-import org.apache.paimon.utils.RowDataToObjectArrayConverter;
+import org.apache.paimon.utils.SerializableFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.BoundedOneInput;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.table.data.RowData;
import java.io.File;
import java.io.IOException;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
import java.util.concurrent.ThreadLocalRandom;
import static org.apache.paimon.flink.FlinkRowData.toFlinkRowKind;
/** A {@link OneInputStreamOperator} for {@link GlobalIndexAssigner}. */
public class GlobalIndexAssignerOperator<T> extends
AbstractStreamOperator<Tuple2<T, Integer>>
- implements OneInputStreamOperator<Tuple2<KeyPartOrRow, T>, Tuple2<T,
Integer>> {
+ implements OneInputStreamOperator<Tuple2<KeyPartOrRow, T>, Tuple2<T,
Integer>>,
+ BoundedOneInput {
private static final long serialVersionUID = 1L;
+ private final Table table;
private final GlobalIndexAssigner<T> assigner;
+ private final SerializableFunction<T, InternalRow> toRow;
+ private final SerializableFunction<InternalRow, T> fromRow;
- public GlobalIndexAssignerOperator(GlobalIndexAssigner<T> assigner) {
+ private transient ExternalBuffer bootstrapBuffer;
+
+ public GlobalIndexAssignerOperator(
+ Table table,
+ GlobalIndexAssigner<T> assigner,
+ SerializableFunction<T, InternalRow> toRow,
+ SerializableFunction<InternalRow, T> fromRow) {
+ this.table = table;
this.assigner = assigner;
+ this.toRow = toRow;
+ this.fromRow = fromRow;
}
@Override
public void initializeState(StateInitializationContext context) throws
Exception {
super.initializeState(context);
- File[] tmpDirs =
-
getContainingTask().getEnvironment().getIOManager().getSpillingDirectories();
+ org.apache.flink.runtime.io.disk.iomanager.IOManager ioManager =
+ getContainingTask().getEnvironment().getIOManager();
+ File[] tmpDirs = ioManager.getSpillingDirectories();
File tmpDir =
tmpDirs[ThreadLocalRandom.current().nextInt(tmpDirs.length)];
assigner.open(
tmpDir,
getRuntimeContext().getNumberOfParallelSubtasks(),
getRuntimeContext().getIndexOfThisSubtask(),
this::collect);
+ Options options = Options.fromMap(table.options());
+ long bufferSize =
options.get(CoreOptions.WRITE_BUFFER_SIZE).getBytes();
+ long pageSize = options.get(CoreOptions.PAGE_SIZE).getBytes();
+ bootstrapBuffer =
+ new ExternalBuffer(
+
IOManager.create(ioManager.getSpillingDirectoriesPaths()),
+ new HeapMemorySegmentPool(bufferSize, (int) pageSize),
+ new InternalRowSerializer(table.rowType()));
}
@Override
@@ -80,11 +105,38 @@ public class GlobalIndexAssignerOperator<T> extends
AbstractStreamOperator<Tuple
assigner.bootstrap(value);
break;
case ROW:
- assigner.process(value);
+ if (bootstrapBuffer != null) {
+ bootstrapBuffer.add(toRow.apply(value));
+ } else {
+ assigner.process(value);
+ }
break;
}
}
+ @Override
+ public void prepareSnapshotPreBarrier(long checkpointId) throws Exception {
+ endBootstrap();
+ }
+
+ @Override
+ public void endInput() throws Exception {
+ endBootstrap();
+ }
+
+ private void endBootstrap() throws Exception {
+ if (bootstrapBuffer != null) {
+ bootstrapBuffer.complete();
+ try (ExternalBuffer.BufferIterator iterator =
bootstrapBuffer.newIterator()) {
+ while (iterator.advanceNext()) {
+ assigner.process(fromRow.apply(iterator.getRow()));
+ }
+ }
+ bootstrapBuffer.reset();
+ bootstrapBuffer = null;
+ }
+ }
+
private void collect(T value, int bucket) {
output.collect(new StreamRecord<>(new Tuple2<>(value, bucket)));
}
@@ -95,39 +147,22 @@ public class GlobalIndexAssignerOperator<T> extends
AbstractStreamOperator<Tuple
}
public static GlobalIndexAssignerOperator<RowData> forRowData(Table table)
{
- return new GlobalIndexAssignerOperator<>(createRowDataAssigner(table));
+ return new GlobalIndexAssignerOperator<>(
+ table, createRowDataAssigner(table), FlinkRowWrapper::new,
FlinkRowData::new);
}
public static GlobalIndexAssigner<RowData> createRowDataAssigner(Table t) {
+ RowType bootstrapType =
IndexBootstrap.bootstrapType(((AbstractFileStoreTable) t).schema());
+ int bucketIndex = bootstrapType.getFieldCount() - 1;
return new GlobalIndexAssigner<>(
t,
RowDataPartitionKeyExtractor::new,
KeyPartPartitionKeyExtractor::new,
+ row -> row.getInt(bucketIndex),
new ProjectToRowDataFunction(t.rowType(), t.partitionKeys()),
(rowData, rowKind) -> {
rowData.setRowKind(toFlinkRowKind(rowKind));
return rowData;
});
}
-
- public static GlobalIndexAssignerOperator<CdcRecord> forCdcRecord(Table
table) {
- RowType partitionType = ((FileStoreTable)
table).schema().logicalPartitionType();
- List<String> partitionNames = partitionType.getFieldNames();
- RowDataToObjectArrayConverter converter = new
RowDataToObjectArrayConverter(partitionType);
- GlobalIndexAssigner<CdcRecord> assigner =
- new GlobalIndexAssigner<>(
- table,
- CdcRecordPartitionKeyExtractor::new,
- CdcRecordPartitionKeyExtractor::new,
- (record, part) -> {
- CdcRecord partCdc =
- CdcRecordUtils.fromGenericRow(
-
GenericRow.of(converter.convert(part)), partitionNames);
- Map<String, String> fields = new
HashMap<>(record.fields());
- fields.putAll(partCdc.fields());
- return new CdcRecord(record.kind(), fields);
- },
- CdcRecord::setRowKind);
- return new GlobalIndexAssignerOperator<>(assigner);
- }
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/IndexBootstrap.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/IndexBootstrap.java
index 2df812007..95b4c1a94 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/IndexBootstrap.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/IndexBootstrap.java
@@ -18,26 +18,40 @@
package org.apache.paimon.flink.sink.index;
+import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.JoinedRow;
import org.apache.paimon.reader.RecordReader;
+import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.source.AbstractInnerTableScan;
+import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.table.source.ReadBuilder;
+import org.apache.paimon.table.source.Split;
import org.apache.paimon.table.source.TableScan;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowType;
import java.io.IOException;
import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.Stream;
+import static org.apache.paimon.CoreOptions.SCAN_MODE;
+import static org.apache.paimon.CoreOptions.StartupMode.LATEST;
+
/** Bootstrap key index from Paimon table. */
public class IndexBootstrap implements Serializable {
private static final long serialVersionUID = 1L;
+ public static final String BUCKET_FIELD = "_BUCKET";
+
private final Table table;
public IndexBootstrap(Table table) {
@@ -56,14 +70,42 @@ public class IndexBootstrap implements Serializable {
.map(fieldNames::indexOf)
.mapToInt(Integer::intValue)
.toArray();
- ReadBuilder readBuilder =
table.newReadBuilder().withProjection(projection);
+
+ // force using the latest scan mode
+ ReadBuilder readBuilder =
+ table.copy(Collections.singletonMap(SCAN_MODE.key(),
LATEST.toString()))
+ .newReadBuilder()
+ .withProjection(projection);
AbstractInnerTableScan tableScan = (AbstractInnerTableScan)
readBuilder.newScan();
TableScan.Plan plan =
tableScan.withBucketFilter(bucket -> bucket % numAssigners ==
assignId).plan();
- try (RecordReader<InternalRow> reader =
readBuilder.newRead().createReader(plan)) {
- reader.forEachRemaining(collector);
+ for (Split split : plan.splits()) {
+ try (RecordReader<InternalRow> reader =
readBuilder.newRead().createReader(split)) {
+ int bucket = ((DataSplit) split).bucket();
+ GenericRow bucketRow = GenericRow.of(bucket);
+ JoinedRow joinedRow = new JoinedRow();
+ reader.transform(row -> joinedRow.replace(row, bucketRow))
+ .forEachRemaining(collector);
+ }
}
}
+
+ public static RowType bootstrapType(TableSchema schema) {
+ List<String> primaryKeys = schema.primaryKeys();
+ List<String> partitionKeys = schema.partitionKeys();
+ List<DataField> bootstrapFields =
+ new ArrayList<>(
+ schema.projectedLogicalRowType(
+ Stream.concat(primaryKeys.stream(),
partitionKeys.stream())
+ .collect(Collectors.toList()))
+ .getFields());
+ bootstrapFields.add(
+ new DataField(
+ RowType.currentHighestFieldId(bootstrapFields) + 1,
+ BUCKET_FIELD,
+ DataTypes.INT().notNull()));
+ return new RowType(bootstrapFields);
+ }
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/KeyPartRowChannelComputer.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/KeyPartRowChannelComputer.java
index 9535b3826..aff593007 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/KeyPartRowChannelComputer.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/KeyPartRowChannelComputer.java
@@ -45,7 +45,6 @@ public class KeyPartRowChannelComputer implements
ChannelComputer<Tuple2<KeyPart
public KeyPartRowChannelComputer(
RowType rowType, RowType keyPartType, List<String> primaryKey) {
-
this.rowType = rowType;
this.keyPartType = keyPartType;
this.primaryKey = primaryKey;
@@ -65,4 +64,9 @@ public class KeyPartRowChannelComputer implements
ChannelComputer<Tuple2<KeyPart
.apply(new FlinkRowWrapper(record.f1));
return Math.abs(key.hashCode() % numChannels);
}
+
+ @Override
+ public String toString() {
+ return "shuffle by key hash";
+ }
}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/GlobalDynamicBucketTableITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/GlobalDynamicBucketTableITCase.java
index 65c608ba4..794cb312f 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/GlobalDynamicBucketTableITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/GlobalDynamicBucketTableITCase.java
@@ -84,4 +84,23 @@ public class GlobalDynamicBucketTableITCase extends
CatalogITCaseBase {
assertThat(sql("SELECT DISTINCT bucket FROM T$files"))
.containsExactlyInAnyOrder(Row.of(0), Row.of(1), Row.of(2));
}
+
+ @Test
+ public void testLargeRecords() {
+ sql(
+ "create table large_t (pt int, k int, v int, primary key (k)
not enforced) partitioned by (pt) with ("
+ + "'bucket'='-1', "
+ + "'dynamic-bucket.target-row-num'='10000')");
+ sql(
+ "create temporary table src (pt int, k int, v int) with ("
+ + "'connector'='datagen', "
+ + "'number-of-rows'='100000', "
+ + "'fields.k.min'='0', "
+ + "'fields.k.max'='100000', "
+ + "'fields.pt.min'='0', "
+ + "'fields.pt.max'='1')");
+ sql("insert into large_t select * from src");
+ sql("insert into large_t select * from src");
+ assertThat(sql("select k, count(*) from large_t group by k having
count(*) > 1")).isEmpty();
+ }
}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncTableSinkITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncTableSinkITCase.java
index 81c9d1495..346e48b37 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncTableSinkITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncTableSinkITCase.java
@@ -46,6 +46,7 @@ import org.apache.paimon.utils.TraceableFileIO;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.api.io.TempDir;
@@ -76,6 +77,7 @@ public class FlinkCdcSyncTableSinkITCase extends
AbstractTestBase {
innerTestRandomCdcEvents(-1, false);
}
+ @Disabled
@Test
@Timeout(120)
public void testRandomCdcEventsGlobalDynamicBucket() throws Exception {
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/index/IndexBootstrapTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/index/IndexBootstrapTest.java
index cd185d1de..aadec017e 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/index/IndexBootstrapTest.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/index/IndexBootstrapTest.java
@@ -65,15 +65,24 @@ public class IndexBootstrapTest extends TableTestBase {
GenericRow.of(7, 7));
IndexBootstrap indexBootstrap = new IndexBootstrap(table);
- List<Integer> result = new ArrayList<>();
- Consumer<InternalRow> consumer = row -> result.add(row.getInt(0));
+ List<GenericRow> result = new ArrayList<>();
+ Consumer<InternalRow> consumer =
+ row -> result.add(GenericRow.of(row.getInt(0), row.getInt(1)));
+
+ // output key and bucket
indexBootstrap.bootstrap(2, 0, consumer);
- assertThat(result).containsExactlyInAnyOrder(2, 3);
+ assertThat(result).containsExactlyInAnyOrder(GenericRow.of(2, 4),
GenericRow.of(3, 0));
result.clear();
indexBootstrap.bootstrap(2, 1, consumer);
- assertThat(result).containsExactlyInAnyOrder(1, 4, 5, 6, 7);
+ assertThat(result)
+ .containsExactlyInAnyOrder(
+ GenericRow.of(1, 3),
+ GenericRow.of(4, 1),
+ GenericRow.of(5, 1),
+ GenericRow.of(6, 3),
+ GenericRow.of(7, 1));
result.clear();
}
}