luoyuxia commented on code in PR #1543: URL: https://github.com/apache/fluss/pull/1543#discussion_r2281251290
########## fluss-flink/fluss-flink-common/src/test/java/com/alibaba/fluss/flink/lake/reader/SortMergeReaderTest.java: ########## @@ -0,0 +1,193 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.fluss.flink.lake.reader; + +import com.alibaba.fluss.client.table.scanner.ScanRecord; +import com.alibaba.fluss.record.ChangeType; +import com.alibaba.fluss.record.LogRecord; +import com.alibaba.fluss.row.BinaryString; +import com.alibaba.fluss.row.GenericRow; +import com.alibaba.fluss.row.InternalRow; +import com.alibaba.fluss.row.ProjectedRow; +import com.alibaba.fluss.types.IntType; +import com.alibaba.fluss.types.RowType; +import com.alibaba.fluss.types.StringType; +import com.alibaba.fluss.utils.CloseableIterator; + +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Comparator; +import java.util.List; +import java.util.stream.Collectors; + +import static com.alibaba.fluss.testutils.DataTestUtils.row; +import static org.assertj.core.api.Assertions.assertThat; + +/** Test for {@link SortMergeReader}. */ +public class SortMergeReaderTest { Review Comment: nit: ```suggestion class SortMergeReaderTest { ``` ########## fluss-flink/fluss-flink-common/src/test/java/com/alibaba/fluss/flink/lake/reader/SortMergeReaderTest.java: ########## @@ -0,0 +1,193 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.fluss.flink.lake.reader; + +import com.alibaba.fluss.client.table.scanner.ScanRecord; +import com.alibaba.fluss.record.ChangeType; +import com.alibaba.fluss.record.LogRecord; +import com.alibaba.fluss.row.BinaryString; +import com.alibaba.fluss.row.GenericRow; +import com.alibaba.fluss.row.InternalRow; +import com.alibaba.fluss.row.ProjectedRow; +import com.alibaba.fluss.types.IntType; +import com.alibaba.fluss.types.RowType; +import com.alibaba.fluss.types.StringType; +import com.alibaba.fluss.utils.CloseableIterator; + +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Comparator; +import java.util.List; +import java.util.stream.Collectors; + +import static com.alibaba.fluss.testutils.DataTestUtils.row; +import static org.assertj.core.api.Assertions.assertThat; + +/** Test for {@link SortMergeReader}. */ +public class SortMergeReaderTest { + + private static class FlussRowComparator implements Comparator<InternalRow> { + + private final int keyIndex; + + public FlussRowComparator(int keyIndex) { + this.keyIndex = keyIndex; + } + + @Override + public int compare(InternalRow o1, InternalRow o2) { + int compare = o1.getInt(keyIndex) - o2.getInt(keyIndex); + return compare; + } + } + + @Test + void testReadBatch() { Review Comment: Can `testReadBatch` and `testReadBatchWithProjectedFields` combine into a test method since most of logic is same? ########## fluss-flink/fluss-flink-common/src/test/java/com/alibaba/fluss/flink/lake/LakeSplitSerializerTest.java: ########## @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.fluss.flink.lake; + +import com.alibaba.fluss.flink.lake.split.LakeSnapshotAndFlussLogSplit; +import com.alibaba.fluss.flink.lake.split.LakeSnapshotSplit; +import com.alibaba.fluss.flink.source.split.SourceSplitBase; +import com.alibaba.fluss.lake.serializer.SimpleVersionedSerializer; +import com.alibaba.fluss.lake.source.LakeSplit; +import com.alibaba.fluss.metadata.TableBucket; + +import org.apache.flink.core.memory.DataInputDeserializer; +import org.apache.flink.core.memory.DataOutputSerializer; +import org.junit.jupiter.api.Test; +import org.mockito.Mock; +import org.mockito.Mockito; + +import java.io.IOException; +import java.util.Collections; + +import static com.alibaba.fluss.client.table.scanner.log.LogScanner.EARLIEST_OFFSET; +import static com.alibaba.fluss.flink.lake.split.LakeSnapshotAndFlussLogSplit.LAKE_SNAPSHOT_FLUSS_LOG_SPLIT_KIND; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +/** Test case for {@link LakeSplitSerializer}. */ +public class LakeSplitSerializerTest { + private static final byte LAKE_SNAPSHOT_SPLIT_KIND = -1; + + private final SimpleVersionedSerializer<LakeSplit> mockSourceSerializer = + Mockito.mock(SimpleVersionedSerializer.class); Review Comment: not to use mockito, it'll make it hard to maintain code. ########## fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lake/reader/SortMergeReader.java: ########## @@ -0,0 +1,414 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.fluss.flink.lake.reader; + +import com.alibaba.fluss.record.LogRecord; +import com.alibaba.fluss.row.InternalRow; +import com.alibaba.fluss.row.ProjectedRow; +import com.alibaba.fluss.utils.CloseableIterator; + +import javax.annotation.Nullable; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Queue; +import java.util.function.Function; +import java.util.stream.Collectors; + +/** A sort merge reader to merge lakehouse snapshot record and fluss change log. */ +class SortMergeReader { + + private final ProjectedRow snapshotProjectedPkRow; + private final CloseableIterator<LogRecord> lakeRecordIterator; + private final Comparator<InternalRow> userKeyComparator; + private CloseableIterator<KeyValueRow> changeLogIterator; + + private final SnapshotMergedRowIteratorWrapper snapshotMergedRowIteratorWrapper; + + private final ChangeLogIteratorWrapper changeLogIteratorWrapper; + private @Nullable final ProjectedRow projectedRow; + + public SortMergeReader( + @Nullable int[] projectedFields, + int[] pkIndexes, + List<CloseableIterator<LogRecord>> lakeRecordIterators, + Comparator<InternalRow> userKeyComparator, + CloseableIterator<KeyValueRow> changeLogIterator) { + this.userKeyComparator = userKeyComparator; + this.snapshotProjectedPkRow = ProjectedRow.from(pkIndexes); + this.lakeRecordIterator = + ConcatRecordIterator.wrap(lakeRecordIterators, userKeyComparator, pkIndexes); + this.changeLogIterator = changeLogIterator; + this.changeLogIteratorWrapper = new ChangeLogIteratorWrapper(); + this.snapshotMergedRowIteratorWrapper = new SnapshotMergedRowIteratorWrapper(); + // to project to fields provided by user + this.projectedRow = projectedFields == null ? null : ProjectedRow.from(projectedFields); + } + + @Nullable + public CloseableIterator<InternalRow> readBatch() { + if (!lakeRecordIterator.hasNext()) { + return changeLogIterator.hasNext() + ? changeLogIteratorWrapper.replace(changeLogIterator) + : null; + } else { + CloseableIterator<SortMergeRows> mergedRecordIterator = + transform(lakeRecordIterator, this::sortMergeWithChangeLog); + + return snapshotMergedRowIteratorWrapper.replace(mergedRecordIterator); + } + } + + /** A concat record iterator to concat multiple record iterator. */ + private static class ConcatRecordIterator implements CloseableIterator<LogRecord> { + private final Queue<CloseableIterator<LogRecord>> iteratorQueue; + private final ProjectedRow snapshotProjectedPkRow1; + private final ProjectedRow snapshotProjectedPkRow2; + + public ConcatRecordIterator( + List<CloseableIterator<LogRecord>> iteratorList, + int[] pkIndexes, + Comparator<InternalRow> comparator) { + this.snapshotProjectedPkRow1 = ProjectedRow.from(pkIndexes); + this.snapshotProjectedPkRow2 = ProjectedRow.from(pkIndexes); + this.iteratorQueue = + iteratorList.stream() + .filter(Iterator::hasNext) + .map( + iterator -> + SingleElementHeadIterator.addElementToHead( + iterator.next(), iterator)) + .sorted( + (s1, s2) -> + comparator.compare( + getComparableRow(s1, snapshotProjectedPkRow1), + getComparableRow(s2, snapshotProjectedPkRow2))) + .collect(Collectors.toCollection(LinkedList::new)); + } + + public static CloseableIterator<LogRecord> wrap( + List<CloseableIterator<LogRecord>> iteratorList, + Comparator<InternalRow> comparator, + int[] pkIndexes) { + if (iteratorList.isEmpty()) { + return CloseableIterator.wrap(Collections.emptyIterator()); + } + return new ConcatRecordIterator(iteratorList, pkIndexes, comparator); + } + + private InternalRow getComparableRow( + SingleElementHeadIterator<LogRecord> iterator, ProjectedRow projectedRow) { + return projectedRow.replaceRow(iterator.peek().getRow()); + } + + @Override + public void close() { + while (!iteratorQueue.isEmpty()) { + iteratorQueue.poll().close(); + } + } + + @Override + public boolean hasNext() { + while (!iteratorQueue.isEmpty()) { + CloseableIterator<LogRecord> iterator = iteratorQueue.peek(); + if (iterator.hasNext()) { + return true; + } + iteratorQueue.poll().close(); + } + return false; + } + + @Override + public LogRecord next() { + return iteratorQueue.peek().next(); Review Comment: nit: ``` if (!hasNext()) { throw new NoSuchElementException(); } return iteratorQueue.peek().next(); ``` ########## fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lake/reader/LakeSnapshotAndLogSplitScanner.java: ########## @@ -0,0 +1,245 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.fluss.flink.lake.reader; + +import com.alibaba.fluss.client.table.Table; +import com.alibaba.fluss.client.table.scanner.ScanRecord; +import com.alibaba.fluss.client.table.scanner.batch.BatchScanner; +import com.alibaba.fluss.client.table.scanner.log.LogScanner; +import com.alibaba.fluss.client.table.scanner.log.ScanRecords; +import com.alibaba.fluss.flink.lake.split.LakeSnapshotAndFlussLogSplit; +import com.alibaba.fluss.lake.source.LakeSource; +import com.alibaba.fluss.lake.source.LakeSplit; +import com.alibaba.fluss.lake.source.RecordReader; +import com.alibaba.fluss.lake.source.SortedRecordReader; +import com.alibaba.fluss.metadata.TableBucket; +import com.alibaba.fluss.record.ChangeType; +import com.alibaba.fluss.record.LogRecord; +import com.alibaba.fluss.row.InternalRow; +import com.alibaba.fluss.utils.CloseableIterator; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; +import java.util.SortedMap; +import java.util.TreeMap; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +/** A scanner to merge the lakehouse's snapshot and change log. */ +public class LakeSnapshotAndLogSplitScanner implements BatchScanner { + + private final LakeSnapshotAndFlussLogSplit lakeSnapshotSplitAndFlussLogSplit; + private Comparator<InternalRow> rowComparator; + private List<CloseableIterator<LogRecord>> lakeRecordIterators = new ArrayList<>(); + private final LakeSource<LakeSplit> lakeSource; + + private final int[] pkIndexes; + + // the indexes of primary key in emitted row by paimon and fluss + private int[] keyIndexesInRow; + @Nullable private int[] adjustProjectedFields; + private int[] newProjectedFields; + + // the sorted logs in memory, mapping from key -> value + private SortedMap<InternalRow, KeyValueRow> logRows; + + private final LogScanner logScanner; + private final long stoppingOffset; + private boolean logScanFinished; + + private SortMergeReader currentSortMergeReader; + + public LakeSnapshotAndLogSplitScanner( + Table table, + LakeSource<LakeSplit> lakeSource, + LakeSnapshotAndFlussLogSplit lakeSnapshotAndFlussLogSplit) { + this.pkIndexes = table.getTableInfo().getSchema().getPrimaryKeyIndexes(); + this.lakeSnapshotSplitAndFlussLogSplit = lakeSnapshotAndFlussLogSplit; + this.lakeSource = lakeSource; + this.newProjectedFields = getNeedProjectFields(table); + + this.logScanner = table.newScan().project(newProjectedFields).createLogScanner(); + this.lakeSource.withProject( + Arrays.stream(newProjectedFields) + .mapToObj(field -> new int[] {field}) + .toArray(int[][]::new)); + + TableBucket tableBucket = lakeSnapshotAndFlussLogSplit.getTableBucket(); + if (tableBucket.getPartitionId() != null) { + this.logScanner.subscribe( + tableBucket.getPartitionId(), + tableBucket.getBucket(), + lakeSnapshotAndFlussLogSplit.getStartingOffset()); + } else { + this.logScanner.subscribe( + tableBucket.getBucket(), lakeSnapshotAndFlussLogSplit.getStartingOffset()); + } + + this.stoppingOffset = + lakeSnapshotAndFlussLogSplit + .getStoppingOffset() + .orElseThrow( + () -> + new RuntimeException( + "StoppingOffset is null for split: " + + lakeSnapshotAndFlussLogSplit)); + + this.logScanFinished = lakeSnapshotAndFlussLogSplit.getStartingOffset() >= stoppingOffset; + } + + private int[] getNeedProjectFields(Table flussTable) { + if (lakeSource.getProject() != null) { + int[] projectedFields = + Arrays.stream(lakeSource.getProject()).mapToInt(field -> field[0]).toArray(); + // we need to include the primary key in projected fields to sort merge by pk + // if the provided don't include, we need to include it + List<Integer> newProjectedFields = + Arrays.stream(projectedFields).boxed().collect(Collectors.toList()); + + // the indexes of primary key with new projected fields + keyIndexesInRow = new int[pkIndexes.length]; + for (int i = 0; i < pkIndexes.length; i++) { + int primaryKeyIndex = pkIndexes[i]; + // search the pk in projected fields + int indexInProjectedFields = findIndex(projectedFields, primaryKeyIndex); + if (indexInProjectedFields >= 0) { + keyIndexesInRow[i] = indexInProjectedFields; + } else { + // no pk in projected fields, we must include it to do + // merge sort + newProjectedFields.add(primaryKeyIndex); + keyIndexesInRow[i] = newProjectedFields.size() - 1; + } + } + int[] newProjection = newProjectedFields.stream().mapToInt(Integer::intValue).toArray(); + // the underlying scan will use the new projection to scan data, + // but will still need to map from the new projection to the origin projected fields + int[] adjustProjectedFields = new int[projectedFields.length]; + for (int i = 0; i < projectedFields.length; i++) { + adjustProjectedFields[i] = findIndex(newProjection, projectedFields[i]); + } + this.adjustProjectedFields = adjustProjectedFields; + return newProjection; + } else { + // no projectedFields, use all fields + keyIndexesInRow = pkIndexes; + return IntStream.range(0, flussTable.getTableInfo().getRowType().getFieldCount()) + .toArray(); + } + } + + private int findIndex(int[] array, int target) { + int index = -1; + for (int i = 0; i < array.length; i++) { + if (array[i] == target) { + index = i; + break; + } + } + return index; + } + + @Nullable + @Override + public CloseableIterator<InternalRow> pollBatch(Duration timeout) throws IOException { + if (logScanFinished) { + if (lakeRecordIterators.isEmpty()) { + if (lakeSnapshotSplitAndFlussLogSplit.getLakeSplits() == null + || lakeSnapshotSplitAndFlussLogSplit.getLakeSplits().isEmpty()) { + lakeRecordIterators = + Collections.singletonList(CloseableIterator.emptyIterator()); + } + for (LakeSplit lakeSplit : lakeSnapshotSplitAndFlussLogSplit.getLakeSplits()) { + lakeRecordIterators.add(lakeSource.createRecordReader(() -> lakeSplit).read()); + } + } + if (currentSortMergeReader == null) { + currentSortMergeReader = + new SortMergeReader( + adjustProjectedFields, + keyIndexesInRow, + lakeRecordIterators, + rowComparator, + CloseableIterator.wrap( + logRows == null + ? Collections.emptyIterator() + : logRows.values().iterator())); + } + return currentSortMergeReader.readBatch(); + } else { + if (lakeRecordIterators.isEmpty()) { + if (lakeSnapshotSplitAndFlussLogSplit.getLakeSplits() == null + || lakeSnapshotSplitAndFlussLogSplit.getLakeSplits().isEmpty()) { + lakeRecordIterators = + Collections.singletonList(CloseableIterator.emptyIterator()); + } + for (LakeSplit lakeSplit : lakeSnapshotSplitAndFlussLogSplit.getLakeSplits()) { Review Comment: ``` else { for (LakeSplit lakeSplit : lakeSnapshotSplitAndFlussLogSplit.getLakeSplits()) { RecordReader reader = lakeSource.createRecordReader(() -> lakeSplit); if (reader instanceof SortedRecordReader) { rowComparator = ((SortedRecordReader) reader).order(); } else { throw new UnsupportedOperationException( "lake records must instance of sorted view."); } lakeRecordIterators.add(reader.read()); } } ``` ########## fluss-flink/fluss-flink-common/src/test/java/com/alibaba/fluss/flink/lake/reader/SortMergeReaderTest.java: ########## @@ -0,0 +1,193 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.fluss.flink.lake.reader; + +import com.alibaba.fluss.client.table.scanner.ScanRecord; +import com.alibaba.fluss.record.ChangeType; +import com.alibaba.fluss.record.LogRecord; +import com.alibaba.fluss.row.BinaryString; +import com.alibaba.fluss.row.GenericRow; +import com.alibaba.fluss.row.InternalRow; +import com.alibaba.fluss.row.ProjectedRow; +import com.alibaba.fluss.types.IntType; +import com.alibaba.fluss.types.RowType; +import com.alibaba.fluss.types.StringType; +import com.alibaba.fluss.utils.CloseableIterator; + +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Comparator; +import java.util.List; +import java.util.stream.Collectors; + +import static com.alibaba.fluss.testutils.DataTestUtils.row; +import static org.assertj.core.api.Assertions.assertThat; + +/** Test for {@link SortMergeReader}. */ +public class SortMergeReaderTest { + + private static class FlussRowComparator implements Comparator<InternalRow> { + + private final int keyIndex; + + public FlussRowComparator(int keyIndex) { + this.keyIndex = keyIndex; + } + + @Override + public int compare(InternalRow o1, InternalRow o2) { + int compare = o1.getInt(keyIndex) - o2.getInt(keyIndex); Review Comment: nit: ```suggestion return o1.getInt(keyIndex) - o2.getInt(keyIndex); ``` ########## fluss-flink/fluss-flink-common/src/test/java/com/alibaba/fluss/flink/lake/reader/SortMergeReaderTest.java: ########## @@ -0,0 +1,193 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.fluss.flink.lake.reader; + +import com.alibaba.fluss.client.table.scanner.ScanRecord; +import com.alibaba.fluss.record.ChangeType; +import com.alibaba.fluss.record.LogRecord; +import com.alibaba.fluss.row.BinaryString; +import com.alibaba.fluss.row.GenericRow; +import com.alibaba.fluss.row.InternalRow; +import com.alibaba.fluss.row.ProjectedRow; +import com.alibaba.fluss.types.IntType; +import com.alibaba.fluss.types.RowType; +import com.alibaba.fluss.types.StringType; +import com.alibaba.fluss.utils.CloseableIterator; + +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Comparator; +import java.util.List; +import java.util.stream.Collectors; + +import static com.alibaba.fluss.testutils.DataTestUtils.row; +import static org.assertj.core.api.Assertions.assertThat; + +/** Test for {@link SortMergeReader}. */ +public class SortMergeReaderTest { + + private static class FlussRowComparator implements Comparator<InternalRow> { + + private final int keyIndex; + + public FlussRowComparator(int keyIndex) { + this.keyIndex = keyIndex; + } + + @Override + public int compare(InternalRow o1, InternalRow o2) { + int compare = o1.getInt(keyIndex) - o2.getInt(keyIndex); + return compare; + } + } + + @Test + void testReadBatch() { + int keyIndex = 0; + int[] pkIndexes = new int[] {keyIndex}; + List<LogRecord> logRecords1 = createRecords(0, 10, false); + List<LogRecord> logRecords2 = createRecords(10, 10, false); + List<KeyValueRow> logRecords3 = + createRecords(5, 10, true).stream() + .map(logRecord -> new KeyValueRow(pkIndexes, logRecord.getRow(), false)) + .collect(Collectors.toList()); + + SortMergeReader sortMergeReader = + new SortMergeReader( + null, + new int[] {keyIndex}, + Arrays.asList( + CloseableIterator.wrap(logRecords2.iterator()), + CloseableIterator.wrap(logRecords1.iterator())), + new FlussRowComparator(keyIndex), + CloseableIterator.wrap(logRecords3.iterator())); + + List<InternalRow> actualRows = new ArrayList<>(); + InternalRow.FieldGetter[] fieldGetters = + InternalRow.createFieldGetters( + RowType.of(new IntType(), new StringType(), new StringType())); + try (CloseableIterator<InternalRow> iterator = sortMergeReader.readBatch()) { + actualRows.addAll(materializeRows(iterator, fieldGetters)); + } + assertThat(actualRows).hasSize(20); + List<LogRecord> excepted = createRecords(0, 5, false); + excepted.addAll(createRecords(5, 10, true)); + excepted.addAll(createRecords(15, 5, false)); + assertThat(actualRows) + .isEqualTo( + materializeRows( + CloseableIterator.wrap( + excepted.stream().map(LogRecord::getRow).iterator()), + fieldGetters)); + } + + @Test + void testReadBatchWithProjectedFields() { + int keyIndex = 0; + int[] projectedFields = new int[] {keyIndex, 1}; + int[] pkIndexes = new int[] {keyIndex}; + List<LogRecord> logRecords1 = createRecords(0, 10, false); + List<LogRecord> logRecords2 = createRecords(10, 10, false); + List<KeyValueRow> logRecords3 = + createRecords(5, 10, true).stream() + .map(logRecord -> new KeyValueRow(pkIndexes, logRecord.getRow(), false)) + .collect(Collectors.toList()); + + SortMergeReader sortMergeReader = + new SortMergeReader( + projectedFields, + new int[] {keyIndex}, + Arrays.asList( + CloseableIterator.wrap(logRecords2.iterator()), + CloseableIterator.wrap(logRecords1.iterator())), + new FlussRowComparator(keyIndex), + CloseableIterator.wrap(logRecords3.iterator())); + + List<InternalRow> actualRows = new ArrayList<>(); + InternalRow.FieldGetter[] fieldGetters = + InternalRow.createFieldGetters(RowType.of(new IntType(), new StringType())); + try (CloseableIterator<InternalRow> iterator = sortMergeReader.readBatch()) { + actualRows.addAll(materializeRows(iterator, fieldGetters)); + } + assertThat(actualRows).hasSize(20); + List<LogRecord> excepted = createRecords(0, 5, false); Review Comment: nit: ```suggestion List<LogRecord> expected = createRecords(0, 5, false); ``` ########## fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lake/reader/LakeSnapshotAndLogSplitScanner.java: ########## @@ -0,0 +1,245 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.fluss.flink.lake.reader; + +import com.alibaba.fluss.client.table.Table; +import com.alibaba.fluss.client.table.scanner.ScanRecord; +import com.alibaba.fluss.client.table.scanner.batch.BatchScanner; +import com.alibaba.fluss.client.table.scanner.log.LogScanner; +import com.alibaba.fluss.client.table.scanner.log.ScanRecords; +import com.alibaba.fluss.flink.lake.split.LakeSnapshotAndFlussLogSplit; +import com.alibaba.fluss.lake.source.LakeSource; +import com.alibaba.fluss.lake.source.LakeSplit; +import com.alibaba.fluss.lake.source.RecordReader; +import com.alibaba.fluss.lake.source.SortedRecordReader; +import com.alibaba.fluss.metadata.TableBucket; +import com.alibaba.fluss.record.ChangeType; +import com.alibaba.fluss.record.LogRecord; +import com.alibaba.fluss.row.InternalRow; +import com.alibaba.fluss.utils.CloseableIterator; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; +import java.util.SortedMap; +import java.util.TreeMap; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +/** A scanner to merge the lakehouse's snapshot and change log. */ +public class LakeSnapshotAndLogSplitScanner implements BatchScanner { + + private final LakeSnapshotAndFlussLogSplit lakeSnapshotSplitAndFlussLogSplit; + private Comparator<InternalRow> rowComparator; + private List<CloseableIterator<LogRecord>> lakeRecordIterators = new ArrayList<>(); + private final LakeSource<LakeSplit> lakeSource; + + private final int[] pkIndexes; + + // the indexes of primary key in emitted row by paimon and fluss + private int[] keyIndexesInRow; + @Nullable private int[] adjustProjectedFields; + private int[] newProjectedFields; + + // the sorted logs in memory, mapping from key -> value + private SortedMap<InternalRow, KeyValueRow> logRows; + + private final LogScanner logScanner; + private final long stoppingOffset; + private boolean logScanFinished; + + private SortMergeReader currentSortMergeReader; + + public LakeSnapshotAndLogSplitScanner( + Table table, + LakeSource<LakeSplit> lakeSource, + LakeSnapshotAndFlussLogSplit lakeSnapshotAndFlussLogSplit) { + this.pkIndexes = table.getTableInfo().getSchema().getPrimaryKeyIndexes(); + this.lakeSnapshotSplitAndFlussLogSplit = lakeSnapshotAndFlussLogSplit; + this.lakeSource = lakeSource; + this.newProjectedFields = getNeedProjectFields(table); + + this.logScanner = table.newScan().project(newProjectedFields).createLogScanner(); + this.lakeSource.withProject( + Arrays.stream(newProjectedFields) + .mapToObj(field -> new int[] {field}) + .toArray(int[][]::new)); + + TableBucket tableBucket = lakeSnapshotAndFlussLogSplit.getTableBucket(); + if (tableBucket.getPartitionId() != null) { + this.logScanner.subscribe( + tableBucket.getPartitionId(), + tableBucket.getBucket(), + lakeSnapshotAndFlussLogSplit.getStartingOffset()); + } else { + this.logScanner.subscribe( + tableBucket.getBucket(), lakeSnapshotAndFlussLogSplit.getStartingOffset()); + } + + this.stoppingOffset = + lakeSnapshotAndFlussLogSplit + .getStoppingOffset() + .orElseThrow( + () -> + new RuntimeException( + "StoppingOffset is null for split: " + + lakeSnapshotAndFlussLogSplit)); + + this.logScanFinished = lakeSnapshotAndFlussLogSplit.getStartingOffset() >= stoppingOffset; + } + + private int[] getNeedProjectFields(Table flussTable) { + if (lakeSource.getProject() != null) { + int[] projectedFields = + Arrays.stream(lakeSource.getProject()).mapToInt(field -> field[0]).toArray(); + // we need to include the primary key in projected fields to sort merge by pk + // if the provided don't include, we need to include it + List<Integer> newProjectedFields = + Arrays.stream(projectedFields).boxed().collect(Collectors.toList()); + + // the indexes of primary key with new projected fields + keyIndexesInRow = new int[pkIndexes.length]; + for (int i = 0; i < pkIndexes.length; i++) { + int primaryKeyIndex = pkIndexes[i]; + // search the pk in projected fields + int indexInProjectedFields = findIndex(projectedFields, primaryKeyIndex); + if (indexInProjectedFields >= 0) { + keyIndexesInRow[i] = indexInProjectedFields; + } else { + // no pk in projected fields, we must include it to do + // merge sort + newProjectedFields.add(primaryKeyIndex); + keyIndexesInRow[i] = newProjectedFields.size() - 1; + } + } + int[] newProjection = newProjectedFields.stream().mapToInt(Integer::intValue).toArray(); + // the underlying scan will use the new projection to scan data, + // but will still need to map from the new projection to the origin projected fields + int[] adjustProjectedFields = new int[projectedFields.length]; + for (int i = 0; i < projectedFields.length; i++) { + adjustProjectedFields[i] = findIndex(newProjection, projectedFields[i]); + } + this.adjustProjectedFields = adjustProjectedFields; + return newProjection; + } else { + // no projectedFields, use all fields + keyIndexesInRow = pkIndexes; + return IntStream.range(0, flussTable.getTableInfo().getRowType().getFieldCount()) + .toArray(); + } + } + + private int findIndex(int[] array, int target) { + int index = -1; + for (int i = 0; i < array.length; i++) { + if (array[i] == target) { + index = i; + break; + } + } + return index; + } + + @Nullable + @Override + public CloseableIterator<InternalRow> pollBatch(Duration timeout) throws IOException { + if (logScanFinished) { + if (lakeRecordIterators.isEmpty()) { + if (lakeSnapshotSplitAndFlussLogSplit.getLakeSplits() == null + || lakeSnapshotSplitAndFlussLogSplit.getLakeSplits().isEmpty()) { + lakeRecordIterators = + Collections.singletonList(CloseableIterator.emptyIterator()); + } + for (LakeSplit lakeSplit : lakeSnapshotSplitAndFlussLogSplit.getLakeSplits()) { Review Comment: ``` else { for (LakeSplit lakeSplit : lakeSnapshotSplitAndFlussLogSplit.getLakeSplits()) { lakeRecordIterators.add( lakeSource.createRecordReader(() -> lakeSplit).read()); } } ``` ########## fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lake/reader/LakeSnapshotAndLogSplitScanner.java: ########## @@ -0,0 +1,245 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.fluss.flink.lake.reader; + +import com.alibaba.fluss.client.table.Table; +import com.alibaba.fluss.client.table.scanner.ScanRecord; +import com.alibaba.fluss.client.table.scanner.batch.BatchScanner; +import com.alibaba.fluss.client.table.scanner.log.LogScanner; +import com.alibaba.fluss.client.table.scanner.log.ScanRecords; +import com.alibaba.fluss.flink.lake.split.LakeSnapshotAndFlussLogSplit; +import com.alibaba.fluss.lake.source.LakeSource; +import com.alibaba.fluss.lake.source.LakeSplit; +import com.alibaba.fluss.lake.source.RecordReader; +import com.alibaba.fluss.lake.source.SortedRecordReader; +import com.alibaba.fluss.metadata.TableBucket; +import com.alibaba.fluss.record.ChangeType; +import com.alibaba.fluss.record.LogRecord; +import com.alibaba.fluss.row.InternalRow; +import com.alibaba.fluss.utils.CloseableIterator; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; +import java.util.SortedMap; +import java.util.TreeMap; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +/** A scanner to merge the lakehouse's snapshot and change log. */ +public class LakeSnapshotAndLogSplitScanner implements BatchScanner { + + private final LakeSnapshotAndFlussLogSplit lakeSnapshotSplitAndFlussLogSplit; + private Comparator<InternalRow> rowComparator; + private List<CloseableIterator<LogRecord>> lakeRecordIterators = new ArrayList<>(); + private final LakeSource<LakeSplit> lakeSource; + + private final int[] pkIndexes; + + // the indexes of primary key in emitted row by paimon and fluss + private int[] keyIndexesInRow; + @Nullable private int[] adjustProjectedFields; + private int[] newProjectedFields; + + // the sorted logs in memory, mapping from key -> value + private SortedMap<InternalRow, KeyValueRow> logRows; + + private final LogScanner logScanner; + private final long stoppingOffset; + private boolean logScanFinished; + + private SortMergeReader currentSortMergeReader; + + public LakeSnapshotAndLogSplitScanner( + Table table, + LakeSource<LakeSplit> lakeSource, + LakeSnapshotAndFlussLogSplit lakeSnapshotAndFlussLogSplit) { + this.pkIndexes = table.getTableInfo().getSchema().getPrimaryKeyIndexes(); + this.lakeSnapshotSplitAndFlussLogSplit = lakeSnapshotAndFlussLogSplit; + this.lakeSource = lakeSource; + this.newProjectedFields = getNeedProjectFields(table); + + this.logScanner = table.newScan().project(newProjectedFields).createLogScanner(); + this.lakeSource.withProject( + Arrays.stream(newProjectedFields) + .mapToObj(field -> new int[] {field}) + .toArray(int[][]::new)); + + TableBucket tableBucket = lakeSnapshotAndFlussLogSplit.getTableBucket(); + if (tableBucket.getPartitionId() != null) { + this.logScanner.subscribe( + tableBucket.getPartitionId(), + tableBucket.getBucket(), + lakeSnapshotAndFlussLogSplit.getStartingOffset()); + } else { + this.logScanner.subscribe( + tableBucket.getBucket(), lakeSnapshotAndFlussLogSplit.getStartingOffset()); + } + + this.stoppingOffset = + lakeSnapshotAndFlussLogSplit + .getStoppingOffset() + .orElseThrow( + () -> + new RuntimeException( + "StoppingOffset is null for split: " + + lakeSnapshotAndFlussLogSplit)); + + this.logScanFinished = lakeSnapshotAndFlussLogSplit.getStartingOffset() >= stoppingOffset; + } + + private int[] getNeedProjectFields(Table flussTable) { + if (lakeSource.getProject() != null) { + int[] projectedFields = + Arrays.stream(lakeSource.getProject()).mapToInt(field -> field[0]).toArray(); + // we need to include the primary key in projected fields to sort merge by pk + // if the provided don't include, we need to include it + List<Integer> newProjectedFields = + Arrays.stream(projectedFields).boxed().collect(Collectors.toList()); + + // the indexes of primary key with new projected fields + keyIndexesInRow = new int[pkIndexes.length]; + for (int i = 0; i < pkIndexes.length; i++) { + int primaryKeyIndex = pkIndexes[i]; + // search the pk in projected fields + int indexInProjectedFields = findIndex(projectedFields, primaryKeyIndex); + if (indexInProjectedFields >= 0) { + keyIndexesInRow[i] = indexInProjectedFields; + } else { + // no pk in projected fields, we must include it to do + // merge sort + newProjectedFields.add(primaryKeyIndex); + keyIndexesInRow[i] = newProjectedFields.size() - 1; + } + } + int[] newProjection = newProjectedFields.stream().mapToInt(Integer::intValue).toArray(); + // the underlying scan will use the new projection to scan data, + // but will still need to map from the new projection to the origin projected fields + int[] adjustProjectedFields = new int[projectedFields.length]; + for (int i = 0; i < projectedFields.length; i++) { + adjustProjectedFields[i] = findIndex(newProjection, projectedFields[i]); + } + this.adjustProjectedFields = adjustProjectedFields; + return newProjection; + } else { + // no projectedFields, use all fields + keyIndexesInRow = pkIndexes; + return IntStream.range(0, flussTable.getTableInfo().getRowType().getFieldCount()) + .toArray(); + } + } + + private int findIndex(int[] array, int target) { + int index = -1; + for (int i = 0; i < array.length; i++) { + if (array[i] == target) { + index = i; + break; + } + } + return index; + } + + @Nullable + @Override + public CloseableIterator<InternalRow> pollBatch(Duration timeout) throws IOException { + if (logScanFinished) { + if (lakeRecordIterators.isEmpty()) { + if (lakeSnapshotSplitAndFlussLogSplit.getLakeSplits() == null + || lakeSnapshotSplitAndFlussLogSplit.getLakeSplits().isEmpty()) { + lakeRecordIterators = + Collections.singletonList(CloseableIterator.emptyIterator()); + } + for (LakeSplit lakeSplit : lakeSnapshotSplitAndFlussLogSplit.getLakeSplits()) { + lakeRecordIterators.add(lakeSource.createRecordReader(() -> lakeSplit).read()); + } + } + if (currentSortMergeReader == null) { + currentSortMergeReader = + new SortMergeReader( + adjustProjectedFields, + keyIndexesInRow, + lakeRecordIterators, + rowComparator, + CloseableIterator.wrap( + logRows == null + ? Collections.emptyIterator() + : logRows.values().iterator())); + } + return currentSortMergeReader.readBatch(); + } else { + if (lakeRecordIterators.isEmpty()) { + if (lakeSnapshotSplitAndFlussLogSplit.getLakeSplits() == null + || lakeSnapshotSplitAndFlussLogSplit.getLakeSplits().isEmpty()) { + lakeRecordIterators = + Collections.singletonList(CloseableIterator.emptyIterator()); Review Comment: Remember accept this suggestion ########## fluss-flink/fluss-flink-common/src/test/java/com/alibaba/fluss/flink/lake/LakeSplitSerializerTest.java: ########## @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.fluss.flink.lake; + +import com.alibaba.fluss.flink.lake.split.LakeSnapshotAndFlussLogSplit; +import com.alibaba.fluss.flink.lake.split.LakeSnapshotSplit; +import com.alibaba.fluss.flink.source.split.SourceSplitBase; +import com.alibaba.fluss.lake.serializer.SimpleVersionedSerializer; +import com.alibaba.fluss.lake.source.LakeSplit; +import com.alibaba.fluss.metadata.TableBucket; + +import org.apache.flink.core.memory.DataInputDeserializer; +import org.apache.flink.core.memory.DataOutputSerializer; +import org.junit.jupiter.api.Test; +import org.mockito.Mock; +import org.mockito.Mockito; + +import java.io.IOException; +import java.util.Collections; + +import static com.alibaba.fluss.client.table.scanner.log.LogScanner.EARLIEST_OFFSET; +import static com.alibaba.fluss.flink.lake.split.LakeSnapshotAndFlussLogSplit.LAKE_SNAPSHOT_FLUSS_LOG_SPLIT_KIND; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +/** Test case for {@link LakeSplitSerializer}. */ +public class LakeSplitSerializerTest { + private static final byte LAKE_SNAPSHOT_SPLIT_KIND = -1; + + private final SimpleVersionedSerializer<LakeSplit> mockSourceSerializer = + Mockito.mock(SimpleVersionedSerializer.class); + + @Mock private LakeSplit mockLakeSplit = Mockito.mock(LakeSplit.class); + + @Mock private TableBucket mockTableBucket = Mockito.mock(TableBucket.class); + + private final LakeSplitSerializer serializer = new LakeSplitSerializer(mockSourceSerializer); + + @Test + void testSerializeAndDeserializeLakeSnapshotSplit() throws IOException { + // 准备测试数据 Review Comment: Remember accept this suggestion ########## fluss-common/src/main/java/com/alibaba/fluss/lake/source/LakeSource.java: ########## @@ -50,6 +50,14 @@ public interface LakeSource<Split extends LakeSplit> extends Serializable { */ void withProject(int[][] project); + /** + * Returns the column projection applied to the data source. + * + * @return The column projection applied to the data source. Returns null if no projection has + * been set. + */ + int[][] getProject(); Review Comment: Do we real need this method? We can pass `projectedFields` into `LakeSnapshotAndLogSplitScanner`, right? ########## fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lake/reader/SortMergeReader.java: ########## @@ -0,0 +1,414 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.fluss.flink.lake.reader; + +import com.alibaba.fluss.record.LogRecord; +import com.alibaba.fluss.row.InternalRow; +import com.alibaba.fluss.row.ProjectedRow; +import com.alibaba.fluss.utils.CloseableIterator; + +import javax.annotation.Nullable; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Queue; +import java.util.function.Function; +import java.util.stream.Collectors; + +/** A sort merge reader to merge lakehouse snapshot record and fluss change log. */ +class SortMergeReader { + + private final ProjectedRow snapshotProjectedPkRow; + private final CloseableIterator<LogRecord> lakeRecordIterator; + private final Comparator<InternalRow> userKeyComparator; + private CloseableIterator<KeyValueRow> changeLogIterator; + + private final SnapshotMergedRowIteratorWrapper snapshotMergedRowIteratorWrapper; + + private final ChangeLogIteratorWrapper changeLogIteratorWrapper; + private @Nullable final ProjectedRow projectedRow; + + public SortMergeReader( + @Nullable int[] projectedFields, + int[] pkIndexes, + List<CloseableIterator<LogRecord>> lakeRecordIterators, + Comparator<InternalRow> userKeyComparator, + CloseableIterator<KeyValueRow> changeLogIterator) { + this.userKeyComparator = userKeyComparator; + this.snapshotProjectedPkRow = ProjectedRow.from(pkIndexes); + this.lakeRecordIterator = + ConcatRecordIterator.wrap(lakeRecordIterators, userKeyComparator, pkIndexes); + this.changeLogIterator = changeLogIterator; + this.changeLogIteratorWrapper = new ChangeLogIteratorWrapper(); + this.snapshotMergedRowIteratorWrapper = new SnapshotMergedRowIteratorWrapper(); + // to project to fields provided by user + this.projectedRow = projectedFields == null ? null : ProjectedRow.from(projectedFields); + } + + @Nullable + public CloseableIterator<InternalRow> readBatch() { + if (!lakeRecordIterator.hasNext()) { + return changeLogIterator.hasNext() + ? changeLogIteratorWrapper.replace(changeLogIterator) + : null; + } else { + CloseableIterator<SortMergeRows> mergedRecordIterator = + transform(lakeRecordIterator, this::sortMergeWithChangeLog); + + return snapshotMergedRowIteratorWrapper.replace(mergedRecordIterator); + } + } + + /** A concat record iterator to concat multiple record iterator. */ + private static class ConcatRecordIterator implements CloseableIterator<LogRecord> { + private final Queue<CloseableIterator<LogRecord>> iteratorQueue; + private final ProjectedRow snapshotProjectedPkRow1; + private final ProjectedRow snapshotProjectedPkRow2; + + public ConcatRecordIterator( + List<CloseableIterator<LogRecord>> iteratorList, + int[] pkIndexes, + Comparator<InternalRow> comparator) { + this.snapshotProjectedPkRow1 = ProjectedRow.from(pkIndexes); + this.snapshotProjectedPkRow2 = ProjectedRow.from(pkIndexes); + this.iteratorQueue = Review Comment: Can we use `PriorityQueue`? It's more rubust and make it more like merge sort. ``` private static class ConcatRecordIterator implements CloseableIterator<LogRecord> { private final PriorityQueue<SingleElementHeadIterator<LogRecord>> priorityQueue; private final ProjectedRow snapshotProjectedPkRow1; private final ProjectedRow snapshotProjectedPkRow2; public ConcatRecordIterator( List<CloseableIterator<LogRecord>> iteratorList, int[] pkIndexes, Comparator<InternalRow> comparator) { this.snapshotProjectedPkRow1 = ProjectedRow.from(pkIndexes); this.snapshotProjectedPkRow2 = ProjectedRow.from(pkIndexes); this.priorityQueue = new PriorityQueue<>( Math.max(1, iteratorList.size()), (s1, s2) -> comparator.compare( getComparableRow(s1, snapshotProjectedPkRow1), getComparableRow(s2, snapshotProjectedPkRow2))); iteratorList.stream() .filter(Iterator::hasNext) .map( iterator -> SingleElementHeadIterator.addElementToHead( iterator.next(), iterator)) .forEach(priorityQueue::add); } public static CloseableIterator<LogRecord> wrap( List<CloseableIterator<LogRecord>> iteratorList, Comparator<InternalRow> comparator, int[] pkIndexes) { if (iteratorList.isEmpty()) { return CloseableIterator.wrap(Collections.emptyIterator()); } return new ConcatRecordIterator(iteratorList, pkIndexes, comparator); } private InternalRow getComparableRow( SingleElementHeadIterator<LogRecord> iterator, ProjectedRow projectedRow) { return projectedRow.replaceRow(iterator.peek().getRow()); } @Override public void close() { while (!priorityQueue.isEmpty()) { priorityQueue.poll().close(); } } @Override public boolean hasNext() { while (!priorityQueue.isEmpty()) { CloseableIterator<LogRecord> iterator = priorityQueue.peek(); if (iterator.hasNext()) { return true; } priorityQueue.poll().close(); } return false; } @Override public LogRecord next() { if (!hasNext()) { throw new NoSuchElementException(); } return priorityQueue.peek().next(); } } ``` ########## fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lake/LakeSplitGenerator.java: ########## @@ -264,12 +239,9 @@ private List<SourceSplitBase> generateSplit( new TableBucket(tableInfo.getTableId(), partitionId, bucket); Long snapshotLogOffset = tableBucketSnapshotLogOffset.get(tableBucket); long stoppingOffset = bucketEndOffset.get(bucket); - FileStoreSourceSplitGenerator splitGenerator = new FileStoreSourceSplitGenerator(); - splits.add( generateSplitForPrimaryKeyTableBucket( - fileStoreTable, - splitGenerator, + lakeSplits.get(bucket), Review Comment: what if `lakeSplits` is null? ########## fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lake/reader/LakeSnapshotAndLogSplitScanner.java: ########## @@ -0,0 +1,245 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.fluss.flink.lake.reader; + +import com.alibaba.fluss.client.table.Table; +import com.alibaba.fluss.client.table.scanner.ScanRecord; +import com.alibaba.fluss.client.table.scanner.batch.BatchScanner; +import com.alibaba.fluss.client.table.scanner.log.LogScanner; +import com.alibaba.fluss.client.table.scanner.log.ScanRecords; +import com.alibaba.fluss.flink.lake.split.LakeSnapshotAndFlussLogSplit; +import com.alibaba.fluss.lake.source.LakeSource; +import com.alibaba.fluss.lake.source.LakeSplit; +import com.alibaba.fluss.lake.source.RecordReader; +import com.alibaba.fluss.lake.source.SortedRecordReader; +import com.alibaba.fluss.metadata.TableBucket; +import com.alibaba.fluss.record.ChangeType; +import com.alibaba.fluss.record.LogRecord; +import com.alibaba.fluss.row.InternalRow; +import com.alibaba.fluss.utils.CloseableIterator; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; +import java.util.SortedMap; +import java.util.TreeMap; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +/** A scanner to merge the lakehouse's snapshot and change log. */ +public class LakeSnapshotAndLogSplitScanner implements BatchScanner { + + private final LakeSnapshotAndFlussLogSplit lakeSnapshotSplitAndFlussLogSplit; + private Comparator<InternalRow> rowComparator; + private List<CloseableIterator<LogRecord>> lakeRecordIterators = new ArrayList<>(); + private final LakeSource<LakeSplit> lakeSource; + + private final int[] pkIndexes; + + // the indexes of primary key in emitted row by paimon and fluss + private int[] keyIndexesInRow; + @Nullable private int[] adjustProjectedFields; + private int[] newProjectedFields; + + // the sorted logs in memory, mapping from key -> value + private SortedMap<InternalRow, KeyValueRow> logRows; + + private final LogScanner logScanner; + private final long stoppingOffset; + private boolean logScanFinished; + + private SortMergeReader currentSortMergeReader; + + public LakeSnapshotAndLogSplitScanner( + Table table, + LakeSource<LakeSplit> lakeSource, + LakeSnapshotAndFlussLogSplit lakeSnapshotAndFlussLogSplit) { + this.pkIndexes = table.getTableInfo().getSchema().getPrimaryKeyIndexes(); + this.lakeSnapshotSplitAndFlussLogSplit = lakeSnapshotAndFlussLogSplit; + this.lakeSource = lakeSource; + this.newProjectedFields = getNeedProjectFields(table); + + this.logScanner = table.newScan().project(newProjectedFields).createLogScanner(); + this.lakeSource.withProject( + Arrays.stream(newProjectedFields) + .mapToObj(field -> new int[] {field}) + .toArray(int[][]::new)); + + TableBucket tableBucket = lakeSnapshotAndFlussLogSplit.getTableBucket(); + if (tableBucket.getPartitionId() != null) { + this.logScanner.subscribe( + tableBucket.getPartitionId(), + tableBucket.getBucket(), + lakeSnapshotAndFlussLogSplit.getStartingOffset()); + } else { + this.logScanner.subscribe( + tableBucket.getBucket(), lakeSnapshotAndFlussLogSplit.getStartingOffset()); + } + + this.stoppingOffset = + lakeSnapshotAndFlussLogSplit + .getStoppingOffset() + .orElseThrow( + () -> + new RuntimeException( + "StoppingOffset is null for split: " + + lakeSnapshotAndFlussLogSplit)); + + this.logScanFinished = lakeSnapshotAndFlussLogSplit.getStartingOffset() >= stoppingOffset; + } + + private int[] getNeedProjectFields(Table flussTable) { + if (lakeSource.getProject() != null) { + int[] projectedFields = + Arrays.stream(lakeSource.getProject()).mapToInt(field -> field[0]).toArray(); + // we need to include the primary key in projected fields to sort merge by pk + // if the provided don't include, we need to include it + List<Integer> newProjectedFields = + Arrays.stream(projectedFields).boxed().collect(Collectors.toList()); + + // the indexes of primary key with new projected fields + keyIndexesInRow = new int[pkIndexes.length]; + for (int i = 0; i < pkIndexes.length; i++) { + int primaryKeyIndex = pkIndexes[i]; + // search the pk in projected fields + int indexInProjectedFields = findIndex(projectedFields, primaryKeyIndex); + if (indexInProjectedFields >= 0) { + keyIndexesInRow[i] = indexInProjectedFields; + } else { + // no pk in projected fields, we must include it to do + // merge sort + newProjectedFields.add(primaryKeyIndex); + keyIndexesInRow[i] = newProjectedFields.size() - 1; + } + } + int[] newProjection = newProjectedFields.stream().mapToInt(Integer::intValue).toArray(); + // the underlying scan will use the new projection to scan data, + // but will still need to map from the new projection to the origin projected fields + int[] adjustProjectedFields = new int[projectedFields.length]; + for (int i = 0; i < projectedFields.length; i++) { + adjustProjectedFields[i] = findIndex(newProjection, projectedFields[i]); + } + this.adjustProjectedFields = adjustProjectedFields; + return newProjection; + } else { + // no projectedFields, use all fields + keyIndexesInRow = pkIndexes; + return IntStream.range(0, flussTable.getTableInfo().getRowType().getFieldCount()) + .toArray(); + } + } + + private int findIndex(int[] array, int target) { + int index = -1; + for (int i = 0; i < array.length; i++) { + if (array[i] == target) { + index = i; + break; + } + } + return index; + } + + @Nullable + @Override + public CloseableIterator<InternalRow> pollBatch(Duration timeout) throws IOException { + if (logScanFinished) { + if (lakeRecordIterators.isEmpty()) { + if (lakeSnapshotSplitAndFlussLogSplit.getLakeSplits() == null + || lakeSnapshotSplitAndFlussLogSplit.getLakeSplits().isEmpty()) { + lakeRecordIterators = + Collections.singletonList(CloseableIterator.emptyIterator()); Review Comment: Remember accept this suggestion ########## fluss-flink/fluss-flink-common/src/test/java/com/alibaba/fluss/flink/lake/LakeSplitSerializerTest.java: ########## @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.fluss.flink.lake; + +import com.alibaba.fluss.flink.lake.split.LakeSnapshotAndFlussLogSplit; +import com.alibaba.fluss.flink.lake.split.LakeSnapshotSplit; +import com.alibaba.fluss.flink.source.split.SourceSplitBase; +import com.alibaba.fluss.lake.serializer.SimpleVersionedSerializer; +import com.alibaba.fluss.lake.source.LakeSplit; +import com.alibaba.fluss.metadata.TableBucket; + +import org.apache.flink.core.memory.DataInputDeserializer; +import org.apache.flink.core.memory.DataOutputSerializer; +import org.junit.jupiter.api.Test; +import org.mockito.Mock; +import org.mockito.Mockito; + +import java.io.IOException; +import java.util.Collections; + +import static com.alibaba.fluss.client.table.scanner.log.LogScanner.EARLIEST_OFFSET; +import static com.alibaba.fluss.flink.lake.split.LakeSnapshotAndFlussLogSplit.LAKE_SNAPSHOT_FLUSS_LOG_SPLIT_KIND; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +/** Test case for {@link LakeSplitSerializer}. */ +public class LakeSplitSerializerTest { + private static final byte LAKE_SNAPSHOT_SPLIT_KIND = -1; + + private final SimpleVersionedSerializer<LakeSplit> mockSourceSerializer = + Mockito.mock(SimpleVersionedSerializer.class); Review Comment: Don't use `mockito`.. It make it hard to maintail code. ########## fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lake/reader/LakeSnapshotAndLogSplitScanner.java: ########## @@ -0,0 +1,245 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.fluss.flink.lake.reader; + +import com.alibaba.fluss.client.table.Table; +import com.alibaba.fluss.client.table.scanner.ScanRecord; +import com.alibaba.fluss.client.table.scanner.batch.BatchScanner; +import com.alibaba.fluss.client.table.scanner.log.LogScanner; +import com.alibaba.fluss.client.table.scanner.log.ScanRecords; +import com.alibaba.fluss.flink.lake.split.LakeSnapshotAndFlussLogSplit; +import com.alibaba.fluss.lake.source.LakeSource; +import com.alibaba.fluss.lake.source.LakeSplit; +import com.alibaba.fluss.lake.source.RecordReader; +import com.alibaba.fluss.lake.source.SortedRecordReader; +import com.alibaba.fluss.metadata.TableBucket; +import com.alibaba.fluss.record.ChangeType; +import com.alibaba.fluss.record.LogRecord; +import com.alibaba.fluss.row.InternalRow; +import com.alibaba.fluss.utils.CloseableIterator; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; +import java.util.SortedMap; +import java.util.TreeMap; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +/** A scanner to merge the lakehouse's snapshot and change log. */ +public class LakeSnapshotAndLogSplitScanner implements BatchScanner { + + private final LakeSnapshotAndFlussLogSplit lakeSnapshotSplitAndFlussLogSplit; + private Comparator<InternalRow> rowComparator; + private List<CloseableIterator<LogRecord>> lakeRecordIterators = new ArrayList<>(); + private final LakeSource<LakeSplit> lakeSource; + + private final int[] pkIndexes; + + // the indexes of primary key in emitted row by paimon and fluss + private int[] keyIndexesInRow; + @Nullable private int[] adjustProjectedFields; + private int[] newProjectedFields; + + // the sorted logs in memory, mapping from key -> value + private SortedMap<InternalRow, KeyValueRow> logRows; + + private final LogScanner logScanner; + private final long stoppingOffset; + private boolean logScanFinished; + + private SortMergeReader currentSortMergeReader; + + public LakeSnapshotAndLogSplitScanner( + Table table, + LakeSource<LakeSplit> lakeSource, + LakeSnapshotAndFlussLogSplit lakeSnapshotAndFlussLogSplit) { + this.pkIndexes = table.getTableInfo().getSchema().getPrimaryKeyIndexes(); + this.lakeSnapshotSplitAndFlussLogSplit = lakeSnapshotAndFlussLogSplit; + this.lakeSource = lakeSource; + this.newProjectedFields = getNeedProjectFields(table); + + this.logScanner = table.newScan().project(newProjectedFields).createLogScanner(); + this.lakeSource.withProject( + Arrays.stream(newProjectedFields) + .mapToObj(field -> new int[] {field}) + .toArray(int[][]::new)); + + TableBucket tableBucket = lakeSnapshotAndFlussLogSplit.getTableBucket(); + if (tableBucket.getPartitionId() != null) { + this.logScanner.subscribe( + tableBucket.getPartitionId(), + tableBucket.getBucket(), + lakeSnapshotAndFlussLogSplit.getStartingOffset()); + } else { + this.logScanner.subscribe( + tableBucket.getBucket(), lakeSnapshotAndFlussLogSplit.getStartingOffset()); + } + + this.stoppingOffset = + lakeSnapshotAndFlussLogSplit + .getStoppingOffset() + .orElseThrow( + () -> + new RuntimeException( + "StoppingOffset is null for split: " + + lakeSnapshotAndFlussLogSplit)); + + this.logScanFinished = lakeSnapshotAndFlussLogSplit.getStartingOffset() >= stoppingOffset; + } + + private int[] getNeedProjectFields(Table flussTable) { + if (lakeSource.getProject() != null) { + int[] projectedFields = + Arrays.stream(lakeSource.getProject()).mapToInt(field -> field[0]).toArray(); + // we need to include the primary key in projected fields to sort merge by pk + // if the provided don't include, we need to include it + List<Integer> newProjectedFields = + Arrays.stream(projectedFields).boxed().collect(Collectors.toList()); + + // the indexes of primary key with new projected fields + keyIndexesInRow = new int[pkIndexes.length]; + for (int i = 0; i < pkIndexes.length; i++) { + int primaryKeyIndex = pkIndexes[i]; + // search the pk in projected fields + int indexInProjectedFields = findIndex(projectedFields, primaryKeyIndex); + if (indexInProjectedFields >= 0) { + keyIndexesInRow[i] = indexInProjectedFields; + } else { + // no pk in projected fields, we must include it to do + // merge sort + newProjectedFields.add(primaryKeyIndex); + keyIndexesInRow[i] = newProjectedFields.size() - 1; + } + } + int[] newProjection = newProjectedFields.stream().mapToInt(Integer::intValue).toArray(); + // the underlying scan will use the new projection to scan data, + // but will still need to map from the new projection to the origin projected fields + int[] adjustProjectedFields = new int[projectedFields.length]; + for (int i = 0; i < projectedFields.length; i++) { + adjustProjectedFields[i] = findIndex(newProjection, projectedFields[i]); + } + this.adjustProjectedFields = adjustProjectedFields; + return newProjection; + } else { + // no projectedFields, use all fields + keyIndexesInRow = pkIndexes; + return IntStream.range(0, flussTable.getTableInfo().getRowType().getFieldCount()) + .toArray(); + } + } + + private int findIndex(int[] array, int target) { + int index = -1; + for (int i = 0; i < array.length; i++) { + if (array[i] == target) { + index = i; + break; + } + } + return index; + } + + @Nullable + @Override + public CloseableIterator<InternalRow> pollBatch(Duration timeout) throws IOException { + if (logScanFinished) { + if (lakeRecordIterators.isEmpty()) { + if (lakeSnapshotSplitAndFlussLogSplit.getLakeSplits() == null + || lakeSnapshotSplitAndFlussLogSplit.getLakeSplits().isEmpty()) { + lakeRecordIterators = + Collections.singletonList(CloseableIterator.emptyIterator()); + } + for (LakeSplit lakeSplit : lakeSnapshotSplitAndFlussLogSplit.getLakeSplits()) { + lakeRecordIterators.add(lakeSource.createRecordReader(() -> lakeSplit).read()); + } + } + if (currentSortMergeReader == null) { + currentSortMergeReader = + new SortMergeReader( + adjustProjectedFields, + keyIndexesInRow, + lakeRecordIterators, + rowComparator, + CloseableIterator.wrap( + logRows == null + ? Collections.emptyIterator() + : logRows.values().iterator())); + } + return currentSortMergeReader.readBatch(); + } else { + if (lakeRecordIterators.isEmpty()) { + if (lakeSnapshotSplitAndFlussLogSplit.getLakeSplits() == null + || lakeSnapshotSplitAndFlussLogSplit.getLakeSplits().isEmpty()) { + lakeRecordIterators = + Collections.singletonList(CloseableIterator.emptyIterator()); + } + for (LakeSplit lakeSplit : lakeSnapshotSplitAndFlussLogSplit.getLakeSplits()) { + RecordReader reader = lakeSource.createRecordReader(() -> lakeSplit); + if (reader instanceof SortedRecordReader) { + rowComparator = ((SortedRecordReader) reader).order(); + } else { + throw new UnsupportedOperationException( + "lake records must instance of sorted view."); + } + lakeRecordIterators.add(reader.read()); + } + logRows = new TreeMap<>(rowComparator); Review Comment: if no lake splits, `rowComparator` will be null. According to the java doc, ``` Constructs a new, empty tree map, using the natural ordering of its keys. All keys inserted into the map must implement the Comparable interface. Furthermore, all such keys must be mutually comparable: k1.compareTo(k2) must not throw a ClassCastException for any keys k1 and k2 in the map ``` if no lake splits, we don't care about the `rowComparator`, any `rowComparator` should be fine. ########## fluss-flink/fluss-flink-common/src/test/java/com/alibaba/fluss/flink/lake/LakeSplitSerializerTest.java: ########## @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.fluss.flink.lake; + +import com.alibaba.fluss.flink.lake.split.LakeSnapshotAndFlussLogSplit; +import com.alibaba.fluss.flink.lake.split.LakeSnapshotSplit; +import com.alibaba.fluss.flink.source.split.SourceSplitBase; +import com.alibaba.fluss.lake.serializer.SimpleVersionedSerializer; +import com.alibaba.fluss.lake.source.LakeSplit; +import com.alibaba.fluss.metadata.TableBucket; + +import org.apache.flink.core.memory.DataInputDeserializer; +import org.apache.flink.core.memory.DataOutputSerializer; +import org.junit.jupiter.api.Test; +import org.mockito.Mock; +import org.mockito.Mockito; + +import java.io.IOException; +import java.util.Collections; + +import static com.alibaba.fluss.client.table.scanner.log.LogScanner.EARLIEST_OFFSET; +import static com.alibaba.fluss.flink.lake.split.LakeSnapshotAndFlussLogSplit.LAKE_SNAPSHOT_FLUSS_LOG_SPLIT_KIND; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +/** Test case for {@link LakeSplitSerializer}. */ +public class LakeSplitSerializerTest { Review Comment: nit: ```suggestion class LakeSplitSerializerTest { ``` ########## fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lake/reader/LakeSnapshotAndLogSplitScanner.java: ########## @@ -0,0 +1,245 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.fluss.flink.lake.reader; + +import com.alibaba.fluss.client.table.Table; +import com.alibaba.fluss.client.table.scanner.ScanRecord; +import com.alibaba.fluss.client.table.scanner.batch.BatchScanner; +import com.alibaba.fluss.client.table.scanner.log.LogScanner; +import com.alibaba.fluss.client.table.scanner.log.ScanRecords; +import com.alibaba.fluss.flink.lake.split.LakeSnapshotAndFlussLogSplit; +import com.alibaba.fluss.lake.source.LakeSource; +import com.alibaba.fluss.lake.source.LakeSplit; +import com.alibaba.fluss.lake.source.RecordReader; +import com.alibaba.fluss.lake.source.SortedRecordReader; +import com.alibaba.fluss.metadata.TableBucket; +import com.alibaba.fluss.record.ChangeType; +import com.alibaba.fluss.record.LogRecord; +import com.alibaba.fluss.row.InternalRow; +import com.alibaba.fluss.utils.CloseableIterator; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; +import java.util.SortedMap; +import java.util.TreeMap; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +/** A scanner to merge the lakehouse's snapshot and change log. */ +public class LakeSnapshotAndLogSplitScanner implements BatchScanner { + + private final LakeSnapshotAndFlussLogSplit lakeSnapshotSplitAndFlussLogSplit; + private Comparator<InternalRow> rowComparator; + private List<CloseableIterator<LogRecord>> lakeRecordIterators = new ArrayList<>(); + private final LakeSource<LakeSplit> lakeSource; + + private final int[] pkIndexes; + + // the indexes of primary key in emitted row by paimon and fluss + private int[] keyIndexesInRow; + @Nullable private int[] adjustProjectedFields; + private int[] newProjectedFields; + + // the sorted logs in memory, mapping from key -> value + private SortedMap<InternalRow, KeyValueRow> logRows; + + private final LogScanner logScanner; + private final long stoppingOffset; + private boolean logScanFinished; + + private SortMergeReader currentSortMergeReader; + + public LakeSnapshotAndLogSplitScanner( + Table table, + LakeSource<LakeSplit> lakeSource, + LakeSnapshotAndFlussLogSplit lakeSnapshotAndFlussLogSplit) { + this.pkIndexes = table.getTableInfo().getSchema().getPrimaryKeyIndexes(); + this.lakeSnapshotSplitAndFlussLogSplit = lakeSnapshotAndFlussLogSplit; + this.lakeSource = lakeSource; + this.newProjectedFields = getNeedProjectFields(table); + + this.logScanner = table.newScan().project(newProjectedFields).createLogScanner(); + this.lakeSource.withProject( + Arrays.stream(newProjectedFields) + .mapToObj(field -> new int[] {field}) + .toArray(int[][]::new)); + + TableBucket tableBucket = lakeSnapshotAndFlussLogSplit.getTableBucket(); + if (tableBucket.getPartitionId() != null) { + this.logScanner.subscribe( + tableBucket.getPartitionId(), + tableBucket.getBucket(), + lakeSnapshotAndFlussLogSplit.getStartingOffset()); + } else { + this.logScanner.subscribe( + tableBucket.getBucket(), lakeSnapshotAndFlussLogSplit.getStartingOffset()); + } + + this.stoppingOffset = + lakeSnapshotAndFlussLogSplit + .getStoppingOffset() + .orElseThrow( + () -> + new RuntimeException( + "StoppingOffset is null for split: " + + lakeSnapshotAndFlussLogSplit)); + + this.logScanFinished = lakeSnapshotAndFlussLogSplit.getStartingOffset() >= stoppingOffset; + } + + private int[] getNeedProjectFields(Table flussTable) { + if (lakeSource.getProject() != null) { + int[] projectedFields = + Arrays.stream(lakeSource.getProject()).mapToInt(field -> field[0]).toArray(); + // we need to include the primary key in projected fields to sort merge by pk + // if the provided don't include, we need to include it + List<Integer> newProjectedFields = + Arrays.stream(projectedFields).boxed().collect(Collectors.toList()); + + // the indexes of primary key with new projected fields + keyIndexesInRow = new int[pkIndexes.length]; + for (int i = 0; i < pkIndexes.length; i++) { + int primaryKeyIndex = pkIndexes[i]; + // search the pk in projected fields + int indexInProjectedFields = findIndex(projectedFields, primaryKeyIndex); + if (indexInProjectedFields >= 0) { + keyIndexesInRow[i] = indexInProjectedFields; + } else { + // no pk in projected fields, we must include it to do + // merge sort + newProjectedFields.add(primaryKeyIndex); + keyIndexesInRow[i] = newProjectedFields.size() - 1; + } + } + int[] newProjection = newProjectedFields.stream().mapToInt(Integer::intValue).toArray(); + // the underlying scan will use the new projection to scan data, + // but will still need to map from the new projection to the origin projected fields + int[] adjustProjectedFields = new int[projectedFields.length]; + for (int i = 0; i < projectedFields.length; i++) { + adjustProjectedFields[i] = findIndex(newProjection, projectedFields[i]); + } + this.adjustProjectedFields = adjustProjectedFields; + return newProjection; + } else { + // no projectedFields, use all fields + keyIndexesInRow = pkIndexes; + return IntStream.range(0, flussTable.getTableInfo().getRowType().getFieldCount()) + .toArray(); + } + } + + private int findIndex(int[] array, int target) { + int index = -1; + for (int i = 0; i < array.length; i++) { + if (array[i] == target) { + index = i; + break; + } + } + return index; + } + + @Nullable + @Override + public CloseableIterator<InternalRow> pollBatch(Duration timeout) throws IOException { + if (logScanFinished) { + if (lakeRecordIterators.isEmpty()) { + if (lakeSnapshotSplitAndFlussLogSplit.getLakeSplits() == null + || lakeSnapshotSplitAndFlussLogSplit.getLakeSplits().isEmpty()) { + lakeRecordIterators = + Collections.singletonList(CloseableIterator.emptyIterator()); + } + for (LakeSplit lakeSplit : lakeSnapshotSplitAndFlussLogSplit.getLakeSplits()) { + lakeRecordIterators.add(lakeSource.createRecordReader(() -> lakeSplit).read()); + } + } + if (currentSortMergeReader == null) { + currentSortMergeReader = + new SortMergeReader( + adjustProjectedFields, + keyIndexesInRow, + lakeRecordIterators, + rowComparator, + CloseableIterator.wrap( + logRows == null + ? Collections.emptyIterator() + : logRows.values().iterator())); + } + return currentSortMergeReader.readBatch(); + } else { + if (lakeRecordIterators.isEmpty()) { + if (lakeSnapshotSplitAndFlussLogSplit.getLakeSplits() == null + || lakeSnapshotSplitAndFlussLogSplit.getLakeSplits().isEmpty()) { + lakeRecordIterators = + Collections.singletonList(CloseableIterator.emptyIterator()); + } + for (LakeSplit lakeSplit : lakeSnapshotSplitAndFlussLogSplit.getLakeSplits()) { + RecordReader reader = lakeSource.createRecordReader(() -> lakeSplit); + if (reader instanceof SortedRecordReader) { + rowComparator = ((SortedRecordReader) reader).order(); + } else { + throw new UnsupportedOperationException( + "lake records must instance of sorted view."); + } + lakeRecordIterators.add(reader.read()); + } + logRows = new TreeMap<>(rowComparator); + } + pollLogRecords(timeout); + return CloseableIterator.wrap(Collections.emptyIterator()); + } + } + + private void pollLogRecords(Duration timeout) { + ScanRecords scanRecords = logScanner.poll(timeout); + for (ScanRecord scanRecord : scanRecords) { + boolean isDelete = + scanRecord.getChangeType() == ChangeType.DELETE + || scanRecord.getChangeType() == ChangeType.UPDATE_BEFORE; + KeyValueRow keyValueRow = + new KeyValueRow(keyIndexesInRow, scanRecord.getRow(), isDelete); + InternalRow keyRow = keyValueRow.keyRow(); + // upsert the key value row + logRows.put(keyRow, keyValueRow); + if (scanRecord.logOffset() >= stoppingOffset - 1) { + // has reached to the end + logScanFinished = true; + break; + } + } + } + + @Override + public void close() throws IOException { + try { + if (logScanner != null) { + logScanner.close(); Review Comment: also close `lakeRecordIterators`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
