luoyuxia commented on code in PR #1505:
URL: https://github.com/apache/fluss/pull/1505#discussion_r2265574875
##########
fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/tiering/FlussRecordAsPaimonRow.java:
##########
@@ -124,39 +98,14 @@ public long getLong(int pos) {
return internalRow.getLong(pos);
}
- @Override
- public float getFloat(int pos) {
- return internalRow.getFloat(pos);
- }
-
- @Override
- public double getDouble(int pos) {
- return internalRow.getDouble(pos);
- }
-
- @Override
- public BinaryString getString(int pos) {
- return BinaryString.fromBytes(internalRow.getString(pos).toBytes());
- }
-
- @Override
- public Decimal getDecimal(int pos, int precision, int scale) {
- com.alibaba.fluss.row.Decimal flussDecimal =
internalRow.getDecimal(pos, precision, scale);
- if (flussDecimal.isCompact()) {
- return Decimal.fromUnscaledLong(flussDecimal.toUnscaledLong(),
precision, scale);
- } else {
- return Decimal.fromBigDecimal(flussDecimal.toBigDecimal(),
precision, scale);
- }
- }
-
@Override
public Timestamp getTimestamp(int pos, int precision) {
// it's timestamp system column
if (pos == originRowFieldCount + 2) {
return Timestamp.fromEpochMillis(logRecord.timestamp());
}
- DataType paimonTimestampType = tableTowType.getTypeAt(pos);
+ DataType paimonTimestampType = tableRowType.getTypeAt(pos);
Review Comment:
Can be `return super.getTimestamp(pos, precision);`
##########
fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/lakehouse/FlussRowAsPaimonRow.java:
##########
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.lake.paimon.lakehouse;
+
+import com.alibaba.fluss.row.TimestampLtz;
+import com.alibaba.fluss.row.TimestampNtz;
+
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.Decimal;
+import org.apache.paimon.data.InternalArray;
+import org.apache.paimon.data.InternalMap;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.Timestamp;
+import org.apache.paimon.data.variant.Variant;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.RowKind;
+import org.apache.paimon.types.RowType;
+
+/** Adapter class for converting Fluss row to Paimon row. */
+public class FlussRowAsPaimonRow implements InternalRow {
Review Comment:
nit:
```suggestion
class FlussRowAsPaimonRow implements InternalRow {
```
##########
fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/lakehouse/PaimonLakeSource.java:
##########
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.lake.paimon.lakehouse;
+
+import com.alibaba.fluss.config.Configuration;
+import com.alibaba.fluss.lake.serializer.SimpleVersionedSerializer;
+import com.alibaba.fluss.lake.source.LakeSource;
+import com.alibaba.fluss.lake.source.Planner;
+import com.alibaba.fluss.lake.source.RecordReader;
+import com.alibaba.fluss.metadata.TablePath;
+import com.alibaba.fluss.predicate.Predicate;
+
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.CatalogContext;
+import org.apache.paimon.catalog.CatalogFactory;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.table.FileStoreTable;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.List;
+
+import static com.alibaba.fluss.lake.paimon.utils.PaimonConversions.toPaimon;
+
+/**
+ * Paimon Lake format implementation of {@link
com.alibaba.fluss.lake.source.LakeSource} for reading
+ * paimon table.
+ */
+public class PaimonLakeSource implements LakeSource<PaimonSplit> {
+
+ private final Configuration paimonConfig;
+ private final TablePath tablePath;
+
+ private @Nullable int[][] project;
+ private @Nullable org.apache.paimon.predicate.Predicate predicate;
+
+ public PaimonLakeSource(Configuration paimonConfig, TablePath tablePath) {
+ this.paimonConfig = paimonConfig;
+ this.tablePath = tablePath;
+ }
+
+ @Override
+ public void withProject(int[][] project) {
+ this.project = project;
+ }
+
+ @Override
+ public void withLimit(int limit) {
+ throw new UnsupportedOperationException("Not impl.");
+ }
+
+ @Override
+ public FilterPushDownResult withFilters(List<Predicate> predicates) {
+ throw new UnsupportedOperationException("Not impl.");
Review Comment:
nit: We can always return:
```
return FilterPushDownResult.of(Collections.emptyList(), predicates);
```
##########
fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/lakehouse/PaimonRecordReader.java:
##########
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.lake.paimon.lakehouse;
+
+import com.alibaba.fluss.lake.paimon.utils.PaimonRowAsFlussRow;
+import com.alibaba.fluss.lake.source.RecordReader;
+import com.alibaba.fluss.record.ChangeType;
+import com.alibaba.fluss.record.GenericRecord;
+import com.alibaba.fluss.record.LogRecord;
+import com.alibaba.fluss.row.ProjectedRow;
+import com.alibaba.fluss.utils.CloseableIterator;
+
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.source.ReadBuilder;
+import org.apache.paimon.table.source.TableRead;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.RowType;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.IntStream;
+
+import static
com.alibaba.fluss.lake.paimon.utils.PaimonConversions.toChangeType;
+import static com.alibaba.fluss.metadata.TableDescriptor.OFFSET_COLUMN_NAME;
+import static com.alibaba.fluss.metadata.TableDescriptor.TIMESTAMP_COLUMN_NAME;
+
+/** Record reader for paimon table. */
+public class PaimonRecordReader implements RecordReader {
+
+ protected PaimonRowAsFlussRecordIterator iterator;
+ protected @Nullable int[][] project;
+ protected @Nullable Predicate predicate;
+ protected RowType paimonRowType;
+
+ public PaimonRecordReader(
+ FileStoreTable fileStoreTable,
+ PaimonSplit split,
+ @Nullable int[][] project,
+ @Nullable Predicate predicate)
+ throws IOException {
+
+ ReadBuilder readBuilder = fileStoreTable.newReadBuilder();
+ int fieldCount = fileStoreTable.rowType().getFieldCount();
+ List<DataField> pkFields = fileStoreTable.schema().primaryKeysFields();
+ if (project != null) {
+ readBuilder = applyProject(readBuilder, project, fieldCount,
pkFields);
+ }
+
+ if (predicate != null) {
+ readBuilder.withFilter(predicate);
+ }
+
+ TableRead tableRead = readBuilder.newRead();
+ paimonRowType = readBuilder.readType();
+
+ org.apache.paimon.reader.RecordReader<InternalRow> recordReader =
+ tableRead.createReader(split.dataSplit());
+ iterator =
+ new PaimonRecordReader.PaimonRowAsFlussRecordIterator(
+ recordReader.toCloseableIterator(), paimonRowType);
+ }
+
+ @Override
+ public CloseableIterator<LogRecord> read() throws IOException {
+ return iterator;
+ }
+
+ // TODO: Support primary key projection and obtain primary key index for
merging.
+ private ReadBuilder applyProject(
+ ReadBuilder readBuilder, int[][] projects, int fieldCount,
List<DataField> pkFields) {
Review Comment:
```suggestion
ReadBuilder readBuilder, int[][] projects, RowType
paimonFullRowType, List<DataField> pkFields {
```
So that we can use
```
int offsetFieldPos = paimonFullRowType.getFieldIndex(OFFSET_COLUMN_NAME);
```
to get offsetFieldPos to avoid unexpected error.
##########
fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/lakehouse/PaimonSplitPlanner.java:
##########
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.lake.paimon.lakehouse;
+
+import com.alibaba.fluss.config.Configuration;
+import com.alibaba.fluss.lake.source.Planner;
+import com.alibaba.fluss.metadata.TablePath;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.CatalogContext;
+import org.apache.paimon.catalog.CatalogFactory;
+import org.apache.paimon.options.MemorySize;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.source.DataSplit;
+import org.apache.paimon.table.source.InnerTableScan;
+import org.apache.paimon.table.source.Split;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import static com.alibaba.fluss.lake.paimon.utils.PaimonConversions.toPaimon;
+
+/** Split panner for paimon table. */
+public class PaimonSplitPlanner implements Planner<PaimonSplit> {
+
+ private final Configuration paimonConfig;
+ private final TablePath tablePath;
+ private final Predicate predicate;
+ private final long snapshotId;
+
+ public PaimonSplitPlanner(
+ Configuration paimonConfig, TablePath tablePath, Predicate
predicate, long snapshotId) {
+ this.paimonConfig = paimonConfig;
+ this.tablePath = tablePath;
+ this.predicate = predicate;
+ this.snapshotId = snapshotId;
+ }
+
+ @Override
+ public List<PaimonSplit> plan() {
+ try {
+ List<PaimonSplit> splits = new ArrayList<>();
+ try (Catalog catalog = getCatalog()) {
+ FileStoreTable fileStoreTable = getTable(catalog, tablePath,
snapshotId);
+ // if primary key table, only generate splits
+ // to do batch sort merge
+ if (!fileStoreTable.primaryKeys().isEmpty()) {
+ // todo: may need make it passed in context
+ fileStoreTable.copy(
Review Comment:
Do we really need this restrication?
I think we can still generate many paimon splits, these splits will be
passed to same reader.
The readers for the paimon splits are all sorted. We can still sort merge
the paimon splits, right?
##########
fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/flink/PaimonLakeHouseTestBase.java:
##########
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ *
+ */
+
+package com.alibaba.fluss.lake.paimon.flink;
+
+import com.alibaba.fluss.config.Configuration;
+import com.alibaba.fluss.lake.paimon.PaimonLakeStorage;
+import com.alibaba.fluss.metadata.TablePath;
+
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.CatalogContext;
+import org.apache.paimon.catalog.CatalogFactory;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.table.Table;
+import org.apache.paimon.table.sink.BatchTableCommit;
+import org.apache.paimon.table.sink.BatchTableWrite;
+import org.apache.paimon.table.sink.BatchWriteBuilder;
+import org.apache.paimon.table.sink.CommitMessage;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.File;
+import java.util.List;
+
+import static com.alibaba.fluss.lake.paimon.utils.PaimonConversions.toPaimon;
+
+/** Base class for paimon lakehouse test. */
+public class PaimonLakeHouseTestBase {
+ protected static final String DEFAULT_DB = "fluss_lakehouse";
+ protected static final String DEFAULT_TABLE = "test_lakehouse_table";
+ protected static final int DEFAULT_BUCKET_NUM = 1;
+
+ private static @TempDir File tempWarehouseDir;
+ protected static PaimonLakeStorage lakeStorage;
+ protected static Catalog paimonCatalog;
+
+ @BeforeAll
+ protected static void beforeAll() {
+ Configuration configuration = new Configuration();
+ configuration.setString("type", "paimon");
+ configuration.setString("warehouse", tempWarehouseDir.toString());
+ lakeStorage = new PaimonLakeStorage(configuration);
+ paimonCatalog =
+ CatalogFactory.createCatalog(
+
CatalogContext.create(Options.fromMap(configuration.toMap())));
+ }
+
+ public void createTable(TablePath tablePath, Schema schema) throws
Exception {
+ paimonCatalog.createDatabase(tablePath.getDatabaseName(), true);
+ paimonCatalog.createTable(toPaimon(tablePath), schema, true);
+ }
+
+ public void writeRecord(TablePath tablePath, List<InternalRow> records)
throws Exception {
Review Comment:
```
public void writeRecord(TablePath tablePath, List<InternalRow> records)
throws Exception {
Table table = getTable(tablePath);
BatchWriteBuilder writeBuilder =
table.newBatchWriteBuilder().withOverwrite();
try (BatchTableWrite writer = writeBuilder.newWrite()) {
for (InternalRow record : records) {
writer.write(record);
}
List<CommitMessage> messages = writer.prepareCommit();
try (BatchTableCommit commit = writeBuilder.newCommit()) {
commit.commit(messages);
}
}
}
```
to make ide happy
##########
fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/lakehouse/PaimonSortedRecordReaderTest.java:
##########
@@ -0,0 +1,602 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ *
+ */
+
+package com.alibaba.fluss.lake.paimon.lakehouse;
+
+import com.alibaba.fluss.lake.paimon.flink.PaimonLakeHouseTestBase;
+import com.alibaba.fluss.lake.paimon.utils.PaimonRowAsFlussRow;
+import com.alibaba.fluss.lake.source.LakeSource;
+import com.alibaba.fluss.lake.source.RecordReader;
+import com.alibaba.fluss.lake.source.SortedRecordReader;
+import com.alibaba.fluss.metadata.TablePath;
+import com.alibaba.fluss.record.LogRecord;
+import com.alibaba.fluss.row.BinaryString;
+import com.alibaba.fluss.row.Decimal;
+import com.alibaba.fluss.row.GenericRow;
+import com.alibaba.fluss.row.InternalRow;
+import com.alibaba.fluss.row.TimestampLtz;
+import com.alibaba.fluss.row.TimestampNtz;
+import com.alibaba.fluss.types.BigIntType;
+import com.alibaba.fluss.types.BinaryType;
+import com.alibaba.fluss.types.BooleanType;
+import com.alibaba.fluss.types.DecimalType;
+import com.alibaba.fluss.types.DoubleType;
+import com.alibaba.fluss.types.FloatType;
+import com.alibaba.fluss.types.IntType;
+import com.alibaba.fluss.types.LocalZonedTimestampType;
+import com.alibaba.fluss.types.RowType;
+import com.alibaba.fluss.types.SmallIntType;
+import com.alibaba.fluss.types.StringType;
+import com.alibaba.fluss.types.TimestampType;
+import com.alibaba.fluss.types.TinyIntType;
+import com.alibaba.fluss.utils.CloseableIterator;
+
+import org.apache.flink.types.Row;
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.Snapshot;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.table.Table;
+import org.apache.paimon.table.source.ReadBuilder;
+import org.apache.paimon.table.source.Split;
+import org.apache.paimon.table.source.TableRead;
+import org.apache.paimon.types.DataTypes;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static
com.alibaba.fluss.lake.paimon.utils.FlussConversions.convertToFlinkRow;
+import static com.alibaba.fluss.testutils.DataTestUtils.row;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test case for {@link PaimonSortedRecordReader}. */
+public class PaimonSortedRecordReaderTest extends PaimonLakeHouseTestBase {
+ @BeforeAll
+ protected static void beforeAll() {
+ PaimonLakeHouseTestBase.beforeAll();
+ }
+
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ void testReadPkTable(boolean isPartitioned) throws Exception {
+ // first of all, create table and prepare data
+ String tableName = "logTable_" + (isPartitioned ? "partitioned" :
"non_partitioned");
Review Comment:
```suggestion
String tableName = "pkTable_" + (isPartitioned ? "partitioned" :
"non_partitioned");
```
##########
fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/lakehouse/PaimonSortedRecordReaderTest.java:
##########
@@ -0,0 +1,602 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ *
+ */
+
+package com.alibaba.fluss.lake.paimon.lakehouse;
+
+import com.alibaba.fluss.lake.paimon.flink.PaimonLakeHouseTestBase;
+import com.alibaba.fluss.lake.paimon.utils.PaimonRowAsFlussRow;
+import com.alibaba.fluss.lake.source.LakeSource;
+import com.alibaba.fluss.lake.source.RecordReader;
+import com.alibaba.fluss.lake.source.SortedRecordReader;
+import com.alibaba.fluss.metadata.TablePath;
+import com.alibaba.fluss.record.LogRecord;
+import com.alibaba.fluss.row.BinaryString;
+import com.alibaba.fluss.row.Decimal;
+import com.alibaba.fluss.row.GenericRow;
+import com.alibaba.fluss.row.InternalRow;
+import com.alibaba.fluss.row.TimestampLtz;
+import com.alibaba.fluss.row.TimestampNtz;
+import com.alibaba.fluss.types.BigIntType;
+import com.alibaba.fluss.types.BinaryType;
+import com.alibaba.fluss.types.BooleanType;
+import com.alibaba.fluss.types.DecimalType;
+import com.alibaba.fluss.types.DoubleType;
+import com.alibaba.fluss.types.FloatType;
+import com.alibaba.fluss.types.IntType;
+import com.alibaba.fluss.types.LocalZonedTimestampType;
+import com.alibaba.fluss.types.RowType;
+import com.alibaba.fluss.types.SmallIntType;
+import com.alibaba.fluss.types.StringType;
+import com.alibaba.fluss.types.TimestampType;
+import com.alibaba.fluss.types.TinyIntType;
+import com.alibaba.fluss.utils.CloseableIterator;
+
+import org.apache.flink.types.Row;
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.Snapshot;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.table.Table;
+import org.apache.paimon.table.source.ReadBuilder;
+import org.apache.paimon.table.source.Split;
+import org.apache.paimon.table.source.TableRead;
+import org.apache.paimon.types.DataTypes;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static
com.alibaba.fluss.lake.paimon.utils.FlussConversions.convertToFlinkRow;
+import static com.alibaba.fluss.testutils.DataTestUtils.row;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test case for {@link PaimonSortedRecordReader}. */
+public class PaimonSortedRecordReaderTest extends PaimonLakeHouseTestBase {
+ @BeforeAll
+ protected static void beforeAll() {
+ PaimonLakeHouseTestBase.beforeAll();
+ }
+
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ void testReadPkTable(boolean isPartitioned) throws Exception {
+ // first of all, create table and prepare data
+ String tableName = "logTable_" + (isPartitioned ? "partitioned" :
"non_partitioned");
+
+ TablePath tablePath = TablePath.of(DEFAULT_DB, tableName);
+
+ List<InternalRow> writtenRows = new ArrayList<>();
+ preparePkTable(tablePath, isPartitioned, DEFAULT_BUCKET_NUM,
writtenRows);
+
+ LakeSource<PaimonSplit> lakeSource =
lakeStorage.createLakeSource(tablePath);
+ Table table = getTable(tablePath);
+ Snapshot snapshot = table.latestSnapshot().get();
+ List<PaimonSplit> paimonSplits =
lakeSource.createPlanner(snapshot::id).plan();
+
+ List<Row> actual = new ArrayList<>();
+
+ InternalRow.FieldGetter[] fieldGetters =
+ InternalRow.createFieldGetters(getFlussRowType(isPartitioned));
+ for (PaimonSplit paimonSplit : paimonSplits) {
+ RecordReader recordReader = lakeSource.createRecordReader(() ->
paimonSplit);
+
assertThat(recordReader).isInstanceOf(PaimonSortedRecordReader.class);
+ CloseableIterator<LogRecord> iterator = recordReader.read();
+ while (iterator.hasNext()) {
+ InternalRow row = iterator.next().getRow();
+ actual.add(convertToFlinkRow(row, fieldGetters));
+ }
+ iterator.close();
+ }
+ List<Row> expectRows =
+ writtenRows.stream()
+ .map(r -> convertToFlinkRow(r, fieldGetters))
+ .collect(Collectors.toList());
+
+ assertThat(actual).containsExactlyInAnyOrderElementsOf(expectRows);
+
+ ArrayList<InternalRow> logRows = new ArrayList<>();
+ prepareFlussLogRows(5, isPartitioned ? "test" : null, logRows);
+ ArrayList<Row> expectedRows = new ArrayList<>();
+ for (int i = 0; i < 10; i++) {
+ GenericRow row =
+ isPartitioned
+ ? row(
+ i + 30,
+ true,
+ (byte) 100,
+ (short) 200,
+ 400L,
+ 500.1f,
+ 600.0d,
+ BinaryString.fromString(
+ i < 5 ? "update_string_" + i :
"another_string_" + i),
+ Decimal.fromUnscaledLong(900, 5, 2),
+ Decimal.fromBigDecimal(new
BigDecimal(1000), 20, 0),
+
TimestampLtz.fromEpochMillis(1698235273400L),
+
TimestampLtz.fromEpochMillis(1698235273400L, 7000),
+ TimestampNtz.fromMillis(1698235273501L),
+ TimestampNtz.fromMillis(1698235273501L,
8000),
+ new byte[] {5, 6, 7, 8},
+ BinaryString.fromString("test"),
+ 0,
+ (long) 10 + i,
+
TimestampNtz.fromMillis(System.currentTimeMillis()))
+ : row(
+ i + 30,
+ true,
+ (byte) 100,
+ (short) 200,
+ 400L,
+ 500.1f,
+ 600.0d,
+ BinaryString.fromString(
+ i < 5 ? "update_string_" + i :
"another_string_" + i),
+ Decimal.fromUnscaledLong(900, 5, 2),
+ Decimal.fromBigDecimal(new
BigDecimal(1000), 20, 0),
+
TimestampLtz.fromEpochMillis(1698235273400L),
+
TimestampLtz.fromEpochMillis(1698235273400L, 7000),
+ TimestampNtz.fromMillis(1698235273501L),
+ TimestampNtz.fromMillis(1698235273501L,
8000),
+ new byte[] {5, 6, 7, 8},
+ 0,
+ (long) 10 + i,
+
TimestampNtz.fromMillis(System.currentTimeMillis()));
+ expectedRows.add(convertToFlinkRow(row, fieldGetters));
+ }
+
+ List<Row> actualMergeRows =
+ mergeReadLakeRows(lakeSource, paimonSplits, fieldGetters,
logRows);
+
+
assertThat(actualMergeRows).containsExactlyInAnyOrderElementsOf(expectedRows);
Review Comment:
Why do we need to test `actualMergeRows`? It looks a little of complex.
For me, I think we only need to check the rows read by `SortedRecordReader`
is ordered by the order return in method
`SortedRecordReader#Comparator<InternalRow> order()`
##########
fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/lakehouse/PaimonRecordReader.java:
##########
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.lake.paimon.lakehouse;
+
+import com.alibaba.fluss.lake.paimon.utils.PaimonRowAsFlussRow;
+import com.alibaba.fluss.lake.source.RecordReader;
+import com.alibaba.fluss.record.ChangeType;
+import com.alibaba.fluss.record.GenericRecord;
+import com.alibaba.fluss.record.LogRecord;
+import com.alibaba.fluss.row.ProjectedRow;
+import com.alibaba.fluss.utils.CloseableIterator;
+
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.source.ReadBuilder;
+import org.apache.paimon.table.source.TableRead;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.RowType;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.IntStream;
+
+import static
com.alibaba.fluss.lake.paimon.utils.PaimonConversions.toChangeType;
+import static com.alibaba.fluss.metadata.TableDescriptor.OFFSET_COLUMN_NAME;
+import static com.alibaba.fluss.metadata.TableDescriptor.TIMESTAMP_COLUMN_NAME;
+
+/** Record reader for paimon table. */
+public class PaimonRecordReader implements RecordReader {
+
+ protected PaimonRowAsFlussRecordIterator iterator;
+ protected @Nullable int[][] project;
+ protected @Nullable Predicate predicate;
+ protected RowType paimonRowType;
+
+ public PaimonRecordReader(
+ FileStoreTable fileStoreTable,
+ PaimonSplit split,
+ @Nullable int[][] project,
+ @Nullable Predicate predicate)
+ throws IOException {
+
Review Comment:
nit: remove this blank line.
##########
fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/lakehouse/PaimonSplit.java:
##########
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.lake.paimon.lakehouse;
+
+import com.alibaba.fluss.lake.source.LakeSplit;
+
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.table.source.DataSplit;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+/** Split for paimon table. */
+public class PaimonSplit implements LakeSplit {
+
+ private final DataSplit dataSplit;
+
+ public PaimonSplit(DataSplit dataSplit) {
+ this.dataSplit = dataSplit;
+ }
+
+ @Override
+ public int bucket() {
+ return dataSplit.bucket();
+ }
+
+ @Override
+ public List<String> partition() {
+ BinaryRow partition = dataSplit.partition();
+ if (partition.getFieldCount() == 0) {
+ return Collections.emptyList();
+ }
+
+ List<String> partitions = new ArrayList<>();
+ for (int i = 0; i < partition.getFieldCount(); i++) {
+ partitions.add(partition.getString(i).toString());
Review Comment:
nit:
add comment:
```
// Todo Currently, partition column must be String datatype, so we can
always use
// consider it as string. Revisit here when
// #489 is finished.
```
##########
fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/lakehouse/PaimonRecordReader.java:
##########
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.lake.paimon.lakehouse;
+
+import com.alibaba.fluss.lake.paimon.utils.PaimonRowAsFlussRow;
+import com.alibaba.fluss.lake.source.RecordReader;
+import com.alibaba.fluss.record.ChangeType;
+import com.alibaba.fluss.record.GenericRecord;
+import com.alibaba.fluss.record.LogRecord;
+import com.alibaba.fluss.row.ProjectedRow;
+import com.alibaba.fluss.utils.CloseableIterator;
+
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.source.ReadBuilder;
+import org.apache.paimon.table.source.TableRead;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.RowType;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.IntStream;
+
+import static
com.alibaba.fluss.lake.paimon.utils.PaimonConversions.toChangeType;
+import static com.alibaba.fluss.metadata.TableDescriptor.OFFSET_COLUMN_NAME;
+import static com.alibaba.fluss.metadata.TableDescriptor.TIMESTAMP_COLUMN_NAME;
+
+/** Record reader for paimon table. */
+public class PaimonRecordReader implements RecordReader {
+
+ protected PaimonRowAsFlussRecordIterator iterator;
+ protected @Nullable int[][] project;
+ protected @Nullable Predicate predicate;
+ protected RowType paimonRowType;
+
+ public PaimonRecordReader(
+ FileStoreTable fileStoreTable,
+ PaimonSplit split,
+ @Nullable int[][] project,
+ @Nullable Predicate predicate)
+ throws IOException {
+
+ ReadBuilder readBuilder = fileStoreTable.newReadBuilder();
+ int fieldCount = fileStoreTable.rowType().getFieldCount();
+ List<DataField> pkFields = fileStoreTable.schema().primaryKeysFields();
+ if (project != null) {
+ readBuilder = applyProject(readBuilder, project, fieldCount,
pkFields);
+ }
+
+ if (predicate != null) {
+ readBuilder.withFilter(predicate);
+ }
+
+ TableRead tableRead = readBuilder.newRead();
+ paimonRowType = readBuilder.readType();
+
+ org.apache.paimon.reader.RecordReader<InternalRow> recordReader =
+ tableRead.createReader(split.dataSplit());
+ iterator =
+ new PaimonRecordReader.PaimonRowAsFlussRecordIterator(
+ recordReader.toCloseableIterator(), paimonRowType);
+ }
+
+ @Override
+ public CloseableIterator<LogRecord> read() throws IOException {
+ return iterator;
+ }
+
+ // TODO: Support primary key projection and obtain primary key index for
merging.
+ private ReadBuilder applyProject(
+ ReadBuilder readBuilder, int[][] projects, int fieldCount,
List<DataField> pkFields) {
+ int[] pkIds = pkFields.stream().mapToInt(DataField::id).toArray();
+
+ int[] projectIds = Arrays.stream(projects).mapToInt(project ->
project[0]).toArray();
+
+ int bucketFieldPos = fieldCount - 3;
Review Comment:
Do we reall need `bucketFieldPos`?
##########
fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/lakehouse/PaimonRecordReader.java:
##########
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.lake.paimon.lakehouse;
+
+import com.alibaba.fluss.lake.paimon.utils.PaimonRowAsFlussRow;
+import com.alibaba.fluss.lake.source.RecordReader;
+import com.alibaba.fluss.record.ChangeType;
+import com.alibaba.fluss.record.GenericRecord;
+import com.alibaba.fluss.record.LogRecord;
+import com.alibaba.fluss.row.ProjectedRow;
+import com.alibaba.fluss.utils.CloseableIterator;
+
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.source.ReadBuilder;
+import org.apache.paimon.table.source.TableRead;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.RowType;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.IntStream;
+
+import static
com.alibaba.fluss.lake.paimon.utils.PaimonConversions.toChangeType;
+import static com.alibaba.fluss.metadata.TableDescriptor.OFFSET_COLUMN_NAME;
+import static com.alibaba.fluss.metadata.TableDescriptor.TIMESTAMP_COLUMN_NAME;
+
+/** Record reader for paimon table. */
+public class PaimonRecordReader implements RecordReader {
+
+ protected PaimonRowAsFlussRecordIterator iterator;
+ protected @Nullable int[][] project;
+ protected @Nullable Predicate predicate;
+ protected RowType paimonRowType;
+
+ public PaimonRecordReader(
+ FileStoreTable fileStoreTable,
+ PaimonSplit split,
+ @Nullable int[][] project,
+ @Nullable Predicate predicate)
+ throws IOException {
+
+ ReadBuilder readBuilder = fileStoreTable.newReadBuilder();
+ int fieldCount = fileStoreTable.rowType().getFieldCount();
+ List<DataField> pkFields = fileStoreTable.schema().primaryKeysFields();
+ if (project != null) {
+ readBuilder = applyProject(readBuilder, project, fieldCount,
pkFields);
+ }
+
+ if (predicate != null) {
+ readBuilder.withFilter(predicate);
+ }
+
+ TableRead tableRead = readBuilder.newRead();
+ paimonRowType = readBuilder.readType();
+
+ org.apache.paimon.reader.RecordReader<InternalRow> recordReader =
+ tableRead.createReader(split.dataSplit());
+ iterator =
+ new PaimonRecordReader.PaimonRowAsFlussRecordIterator(
+ recordReader.toCloseableIterator(), paimonRowType);
+ }
+
+ @Override
+ public CloseableIterator<LogRecord> read() throws IOException {
+ return iterator;
+ }
+
+ // TODO: Support primary key projection and obtain primary key index for
merging.
Review Comment:
If upper caller handle to add extra primary key columns, did the todo still
needed?
##########
fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/lakehouse/PaimonSplitPlanner.java:
##########
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.lake.paimon.lakehouse;
+
+import com.alibaba.fluss.config.Configuration;
+import com.alibaba.fluss.lake.source.Planner;
+import com.alibaba.fluss.metadata.TablePath;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.CatalogContext;
+import org.apache.paimon.catalog.CatalogFactory;
+import org.apache.paimon.options.MemorySize;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.source.DataSplit;
+import org.apache.paimon.table.source.InnerTableScan;
+import org.apache.paimon.table.source.Split;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import static com.alibaba.fluss.lake.paimon.utils.PaimonConversions.toPaimon;
+
+/** Split panner for paimon table. */
+public class PaimonSplitPlanner implements Planner<PaimonSplit> {
+
+ private final Configuration paimonConfig;
+ private final TablePath tablePath;
+ private final Predicate predicate;
+ private final long snapshotId;
+
+ public PaimonSplitPlanner(
+ Configuration paimonConfig, TablePath tablePath, Predicate
predicate, long snapshotId) {
Review Comment:
nit:
```suggestion
Configuration paimonConfig, TablePath tablePath, @Nullable
Predicate predicate, long snapshotId) {
```
##########
fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/lakehouse/PaimonRecordReader.java:
##########
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.lake.paimon.lakehouse;
+
+import com.alibaba.fluss.lake.paimon.utils.PaimonRowAsFlussRow;
+import com.alibaba.fluss.lake.source.RecordReader;
+import com.alibaba.fluss.record.ChangeType;
+import com.alibaba.fluss.record.GenericRecord;
+import com.alibaba.fluss.record.LogRecord;
+import com.alibaba.fluss.row.ProjectedRow;
+import com.alibaba.fluss.utils.CloseableIterator;
+
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.source.ReadBuilder;
+import org.apache.paimon.table.source.TableRead;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.RowType;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.IntStream;
+
+import static
com.alibaba.fluss.lake.paimon.utils.PaimonConversions.toChangeType;
+import static com.alibaba.fluss.metadata.TableDescriptor.OFFSET_COLUMN_NAME;
+import static com.alibaba.fluss.metadata.TableDescriptor.TIMESTAMP_COLUMN_NAME;
+
+/** Record reader for paimon table. */
+public class PaimonRecordReader implements RecordReader {
+
+ protected PaimonRowAsFlussRecordIterator iterator;
+ protected @Nullable int[][] project;
+ protected @Nullable Predicate predicate;
+ protected RowType paimonRowType;
+
+ public PaimonRecordReader(
+ FileStoreTable fileStoreTable,
+ PaimonSplit split,
+ @Nullable int[][] project,
+ @Nullable Predicate predicate)
+ throws IOException {
+
+ ReadBuilder readBuilder = fileStoreTable.newReadBuilder();
+ int fieldCount = fileStoreTable.rowType().getFieldCount();
+ List<DataField> pkFields = fileStoreTable.schema().primaryKeysFields();
+ if (project != null) {
+ readBuilder = applyProject(readBuilder, project, fieldCount,
pkFields);
+ }
+
+ if (predicate != null) {
+ readBuilder.withFilter(predicate);
+ }
+
+ TableRead tableRead = readBuilder.newRead();
+ paimonRowType = readBuilder.readType();
+
+ org.apache.paimon.reader.RecordReader<InternalRow> recordReader =
+ tableRead.createReader(split.dataSplit());
+ iterator =
+ new PaimonRecordReader.PaimonRowAsFlussRecordIterator(
+ recordReader.toCloseableIterator(), paimonRowType);
+ }
+
+ @Override
+ public CloseableIterator<LogRecord> read() throws IOException {
+ return iterator;
+ }
+
+ // TODO: Support primary key projection and obtain primary key index for
merging.
+ private ReadBuilder applyProject(
+ ReadBuilder readBuilder, int[][] projects, int fieldCount,
List<DataField> pkFields) {
+ int[] pkIds = pkFields.stream().mapToInt(DataField::id).toArray();
+
+ int[] projectIds = Arrays.stream(projects).mapToInt(project ->
project[0]).toArray();
+
+ int bucketFieldPos = fieldCount - 3;
+ int offsetFieldPos = fieldCount - 2;
+ int timestampFieldPos = fieldCount - 1;
+
+ int[] paimonProject =
+ IntStream.concat(
+ IntStream.concat(IntStream.of(projectIds),
IntStream.of(pkIds))
+ .distinct(),
+ IntStream.of(bucketFieldPos, offsetFieldPos,
timestampFieldPos))
+ .toArray();
+
+ return readBuilder.withProjection(paimonProject);
+ }
+
+ /** Iterator for paimon row as fluss record. */
+ public static class PaimonRowAsFlussRecordIterator implements
CloseableIterator<LogRecord> {
+
+ private final org.apache.paimon.utils.CloseableIterator<InternalRow>
paimonRowIterator;
+
+ private final ProjectedRow projectedRow;
+
+ private final int logOffsetColIndex;
+ private final int timestampColIndex;
+
+ public PaimonRowAsFlussRecordIterator(
+ org.apache.paimon.utils.CloseableIterator<InternalRow>
paimonRowIterator,
+ RowType paimonRowType) {
+ this.paimonRowIterator = paimonRowIterator;
+ this.logOffsetColIndex =
paimonRowType.getFieldIndex(OFFSET_COLUMN_NAME);
+ this.timestampColIndex =
paimonRowType.getFieldIndex(TIMESTAMP_COLUMN_NAME);
+
+ int[] project = IntStream.range(0, paimonRowType.getFieldCount() -
3).toArray();
+ projectedRow = ProjectedRow.from(project);
+ }
+
+ @Override
+ public void close() {
+ try {
+ paimonRowIterator.close();
+ } catch (Exception e) {
+ throw new RuntimeException("Fail to close iterator.", e);
+ }
+ }
+
+ @Override
+ public boolean hasNext() {
+ return paimonRowIterator.hasNext();
+ }
+
+ @Override
+ public LogRecord next() {
+ InternalRow paimonRow = paimonRowIterator.next();
+ ChangeType changeType = toChangeType(paimonRow.getRowKind());
+ long offset = paimonRow.getLong(logOffsetColIndex);
+ long timestamp = paimonRow.getTimestamp(timestampColIndex,
6).getMillisecond();
+
+ return new GenericRecord(
+ offset,
+ timestamp,
+ changeType,
+ projectedRow.replaceRow(new
PaimonRowAsFlussRow(paimonRow)));
Review Comment:
nit:
We can predefine a `PaimonRowAsFlussRow` object, and call method
```
PaimonRowAsFlussRow.replicaRow(paimonRow)
```
Instead of every time new a `PaimonRowAsFlussRow`
##########
fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/lakehouse/PaimonSplitPlanner.java:
##########
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.lake.paimon.lakehouse;
+
+import com.alibaba.fluss.config.Configuration;
+import com.alibaba.fluss.lake.source.Planner;
+import com.alibaba.fluss.metadata.TablePath;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.CatalogContext;
+import org.apache.paimon.catalog.CatalogFactory;
+import org.apache.paimon.options.MemorySize;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.source.DataSplit;
+import org.apache.paimon.table.source.InnerTableScan;
+import org.apache.paimon.table.source.Split;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import static com.alibaba.fluss.lake.paimon.utils.PaimonConversions.toPaimon;
+
+/** Split panner for paimon table. */
+public class PaimonSplitPlanner implements Planner<PaimonSplit> {
+
+ private final Configuration paimonConfig;
+ private final TablePath tablePath;
+ private final Predicate predicate;
Review Comment:
```suggestion
private @Nullable final Predicate predicate;
```
##########
fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/lakehouse/FlussRowAsPaimonRowTest.java:
##########
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ *
+ */
+
+package com.alibaba.fluss.lake.paimon.lakehouse;
+
+import com.alibaba.fluss.record.GenericRecord;
+import com.alibaba.fluss.record.LogRecord;
+import com.alibaba.fluss.row.BinaryString;
+import com.alibaba.fluss.row.Decimal;
+import com.alibaba.fluss.row.GenericRow;
+import com.alibaba.fluss.row.TimestampLtz;
+import com.alibaba.fluss.row.TimestampNtz;
+
+import org.apache.paimon.types.RowKind;
+import org.apache.paimon.types.RowType;
+import org.junit.jupiter.api.Test;
+
+import java.math.BigDecimal;
+
+import static com.alibaba.fluss.record.ChangeType.APPEND_ONLY;
+import static com.alibaba.fluss.record.ChangeType.DELETE;
+import static com.alibaba.fluss.record.ChangeType.INSERT;
+import static com.alibaba.fluss.record.ChangeType.UPDATE_AFTER;
+import static com.alibaba.fluss.record.ChangeType.UPDATE_BEFORE;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test case for {@link FlussRowAsPaimonRow}. */
+public class FlussRowAsPaimonRowTest {
Review Comment:
```suggestion
class FlussRowAsPaimonRowTest {
```
##########
fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/flink/PaimonLakeHouseTestBase.java:
##########
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ *
+ */
+
+package com.alibaba.fluss.lake.paimon.flink;
+
+import com.alibaba.fluss.config.Configuration;
+import com.alibaba.fluss.lake.paimon.PaimonLakeStorage;
+import com.alibaba.fluss.metadata.TablePath;
+
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.CatalogContext;
+import org.apache.paimon.catalog.CatalogFactory;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.table.Table;
+import org.apache.paimon.table.sink.BatchTableCommit;
+import org.apache.paimon.table.sink.BatchTableWrite;
+import org.apache.paimon.table.sink.BatchWriteBuilder;
+import org.apache.paimon.table.sink.CommitMessage;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.File;
+import java.util.List;
+
+import static com.alibaba.fluss.lake.paimon.utils.PaimonConversions.toPaimon;
+
+/** Base class for paimon lakehouse test. */
+public class PaimonLakeHouseTestBase {
Review Comment:
nit:
```suggestion
class PaimonLakeHouseTestBase {
```
##########
fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/lakehouse/PaimonRecordReaderTest.java:
##########
@@ -0,0 +1,321 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.lake.paimon.lakehouse;
+
+import com.alibaba.fluss.lake.paimon.flink.PaimonLakeHouseTestBase;
+import com.alibaba.fluss.lake.paimon.utils.PaimonRowAsFlussRow;
+import com.alibaba.fluss.lake.source.LakeSource;
+import com.alibaba.fluss.lake.source.RecordReader;
+import com.alibaba.fluss.metadata.TablePath;
+import com.alibaba.fluss.record.LogRecord;
+import com.alibaba.fluss.row.Decimal;
+import com.alibaba.fluss.row.InternalRow;
+import com.alibaba.fluss.row.TimestampLtz;
+import com.alibaba.fluss.row.TimestampNtz;
+import com.alibaba.fluss.types.BigIntType;
+import com.alibaba.fluss.types.BinaryType;
+import com.alibaba.fluss.types.BooleanType;
+import com.alibaba.fluss.types.DecimalType;
+import com.alibaba.fluss.types.DoubleType;
+import com.alibaba.fluss.types.FloatType;
+import com.alibaba.fluss.types.IntType;
+import com.alibaba.fluss.types.LocalZonedTimestampType;
+import com.alibaba.fluss.types.RowType;
+import com.alibaba.fluss.types.SmallIntType;
+import com.alibaba.fluss.types.StringType;
+import com.alibaba.fluss.types.TimestampType;
+import com.alibaba.fluss.types.TinyIntType;
+import com.alibaba.fluss.utils.CloseableIterator;
+
+import org.apache.flink.types.Row;
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.Snapshot;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.Table;
+import org.apache.paimon.table.source.ReadBuilder;
+import org.apache.paimon.table.source.Split;
+import org.apache.paimon.table.source.TableRead;
+import org.apache.paimon.types.DataTypes;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static com.alibaba.fluss.testutils.DataTestUtils.row;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test case for {@link PaimonRecordReader}. */
+public class PaimonRecordReaderTest extends PaimonLakeHouseTestBase {
Review Comment:
nit:
```suggestion
class PaimonRecordReaderTest extends PaimonLakeHouseTestBase {
```
##########
fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/lakehouse/PaimonSplitTest.java:
##########
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ *
+ */
+
+package com.alibaba.fluss.lake.paimon.lakehouse;
+
+import com.alibaba.fluss.lake.paimon.flink.PaimonLakeHouseTestBase;
+import com.alibaba.fluss.lake.source.LakeSource;
+import com.alibaba.fluss.metadata.TablePath;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.Snapshot;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.Table;
+import org.apache.paimon.table.source.DataSplit;
+import org.apache.paimon.table.source.Split;
+import org.apache.paimon.types.DataTypes;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test case for {@link PaimonSplit}. */
+public class PaimonSplitTest extends PaimonLakeHouseTestBase {
+
+ @Test
+ void testPaimonSplit() throws Exception {
+ // prepare paimon table
+ int bucketNum = 1;
+ TablePath tablePath = TablePath.of(DEFAULT_DB, DEFAULT_TABLE);
+ Schema.Builder builder =
+ Schema.newBuilder()
+ .column("c1", DataTypes.INT())
+ .column("c2", DataTypes.STRING())
+ .column("c3", DataTypes.STRING());
+ builder.partitionKeys("c3");
+ builder.primaryKey("c1", "c3");
+ builder.option(CoreOptions.BUCKET.key(), String.valueOf(bucketNum));
+ createTable(tablePath, builder.build());
+ Table table =
+ paimonCatalog.getTable(
+ Identifier.create(tablePath.getDatabaseName(),
tablePath.getTableName()));
+
+ GenericRow record1 =
+ GenericRow.of(12, BinaryString.fromString("a"),
BinaryString.fromString("A"));
+ writeRecord(tablePath, Arrays.asList(record1));
+ Snapshot snapshot = table.latestSnapshot().get();
+
+ LakeSource<PaimonSplit> lakeSource =
lakeStorage.createLakeSource(tablePath);
+ List<PaimonSplit> paimonSplits =
lakeSource.createPlanner(snapshot::id).plan();
+
+ // test bucket() and partition() method
+ PaimonSplit paimonSplit = paimonSplits.get(0);
+ assertThat(paimonSplit.partition()).isEqualTo(Arrays.asList("A"));
Review Comment:
```suggestion
assertThat(paimonSplit.partition()).isEqualTo(Collections.singletonList("A"));
```
##########
fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/lakehouse/PaimonSplitPlannerTest.java:
##########
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ *
+ */
+
+package com.alibaba.fluss.lake.paimon.lakehouse;
+
+import com.alibaba.fluss.lake.paimon.flink.PaimonLakeHouseTestBase;
+import com.alibaba.fluss.lake.source.LakeSource;
+import com.alibaba.fluss.metadata.TablePath;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.Snapshot;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.Table;
+import org.apache.paimon.table.source.Split;
+import org.apache.paimon.types.DataTypes;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test case for {@link PaimonSplitPlanner}. */
+public class PaimonSplitPlannerTest extends PaimonLakeHouseTestBase {
+ @Test
+ void testPlann() throws Exception {
Review Comment:
```suggestion
void testPlan() throws Exception {
```
##########
fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/lakehouse/PaimonSplitPlannerTest.java:
##########
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ *
+ */
+
+package com.alibaba.fluss.lake.paimon.lakehouse;
+
+import com.alibaba.fluss.lake.paimon.flink.PaimonLakeHouseTestBase;
+import com.alibaba.fluss.lake.source.LakeSource;
+import com.alibaba.fluss.metadata.TablePath;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.Snapshot;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.Table;
+import org.apache.paimon.table.source.Split;
+import org.apache.paimon.types.DataTypes;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test case for {@link PaimonSplitPlanner}. */
+public class PaimonSplitPlannerTest extends PaimonLakeHouseTestBase {
Review Comment:
nit:
```suggestion
class PaimonSplitPlannerTest extends PaimonLakeHouseTestBase {
```
##########
fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/lakehouse/PaimonSplitTest.java:
##########
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ *
+ */
+
+package com.alibaba.fluss.lake.paimon.lakehouse;
+
+import com.alibaba.fluss.lake.paimon.flink.PaimonLakeHouseTestBase;
+import com.alibaba.fluss.lake.source.LakeSource;
+import com.alibaba.fluss.metadata.TablePath;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.Snapshot;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.Table;
+import org.apache.paimon.table.source.DataSplit;
+import org.apache.paimon.table.source.Split;
+import org.apache.paimon.types.DataTypes;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test case for {@link PaimonSplit}. */
+public class PaimonSplitTest extends PaimonLakeHouseTestBase {
+
+ @Test
+ void testPaimonSplit() throws Exception {
+ // prepare paimon table
+ int bucketNum = 1;
+ TablePath tablePath = TablePath.of(DEFAULT_DB, DEFAULT_TABLE);
+ Schema.Builder builder =
+ Schema.newBuilder()
+ .column("c1", DataTypes.INT())
+ .column("c2", DataTypes.STRING())
+ .column("c3", DataTypes.STRING());
+ builder.partitionKeys("c3");
+ builder.primaryKey("c1", "c3");
+ builder.option(CoreOptions.BUCKET.key(), String.valueOf(bucketNum));
+ createTable(tablePath, builder.build());
+ Table table =
+ paimonCatalog.getTable(
+ Identifier.create(tablePath.getDatabaseName(),
tablePath.getTableName()));
+
+ GenericRow record1 =
+ GenericRow.of(12, BinaryString.fromString("a"),
BinaryString.fromString("A"));
+ writeRecord(tablePath, Arrays.asList(record1));
Review Comment:
```suggestion
writeRecord(tablePath, Collections.singletonList(record1));
```
##########
fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/lakehouse/PaimonSortedRecordReaderTest.java:
##########
@@ -0,0 +1,602 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ *
+ */
+
+package com.alibaba.fluss.lake.paimon.lakehouse;
+
+import com.alibaba.fluss.lake.paimon.flink.PaimonLakeHouseTestBase;
+import com.alibaba.fluss.lake.paimon.utils.PaimonRowAsFlussRow;
+import com.alibaba.fluss.lake.source.LakeSource;
+import com.alibaba.fluss.lake.source.RecordReader;
+import com.alibaba.fluss.lake.source.SortedRecordReader;
+import com.alibaba.fluss.metadata.TablePath;
+import com.alibaba.fluss.record.LogRecord;
+import com.alibaba.fluss.row.BinaryString;
+import com.alibaba.fluss.row.Decimal;
+import com.alibaba.fluss.row.GenericRow;
+import com.alibaba.fluss.row.InternalRow;
+import com.alibaba.fluss.row.TimestampLtz;
+import com.alibaba.fluss.row.TimestampNtz;
+import com.alibaba.fluss.types.BigIntType;
+import com.alibaba.fluss.types.BinaryType;
+import com.alibaba.fluss.types.BooleanType;
+import com.alibaba.fluss.types.DecimalType;
+import com.alibaba.fluss.types.DoubleType;
+import com.alibaba.fluss.types.FloatType;
+import com.alibaba.fluss.types.IntType;
+import com.alibaba.fluss.types.LocalZonedTimestampType;
+import com.alibaba.fluss.types.RowType;
+import com.alibaba.fluss.types.SmallIntType;
+import com.alibaba.fluss.types.StringType;
+import com.alibaba.fluss.types.TimestampType;
+import com.alibaba.fluss.types.TinyIntType;
+import com.alibaba.fluss.utils.CloseableIterator;
+
+import org.apache.flink.types.Row;
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.Snapshot;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.table.Table;
+import org.apache.paimon.table.source.ReadBuilder;
+import org.apache.paimon.table.source.Split;
+import org.apache.paimon.table.source.TableRead;
+import org.apache.paimon.types.DataTypes;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static
com.alibaba.fluss.lake.paimon.utils.FlussConversions.convertToFlinkRow;
+import static com.alibaba.fluss.testutils.DataTestUtils.row;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test case for {@link PaimonSortedRecordReader}. */
+public class PaimonSortedRecordReaderTest extends PaimonLakeHouseTestBase {
Review Comment:
nit:
```suggestion
class PaimonSortedRecordReaderTest extends PaimonLakeHouseTestBase {
```
##########
fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/lakehouse/PaimonRecordReaderTest.java:
##########
@@ -0,0 +1,321 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.lake.paimon.lakehouse;
+
+import com.alibaba.fluss.lake.paimon.flink.PaimonLakeHouseTestBase;
+import com.alibaba.fluss.lake.paimon.utils.PaimonRowAsFlussRow;
+import com.alibaba.fluss.lake.source.LakeSource;
+import com.alibaba.fluss.lake.source.RecordReader;
+import com.alibaba.fluss.metadata.TablePath;
+import com.alibaba.fluss.record.LogRecord;
+import com.alibaba.fluss.row.Decimal;
+import com.alibaba.fluss.row.InternalRow;
+import com.alibaba.fluss.row.TimestampLtz;
+import com.alibaba.fluss.row.TimestampNtz;
+import com.alibaba.fluss.types.BigIntType;
+import com.alibaba.fluss.types.BinaryType;
+import com.alibaba.fluss.types.BooleanType;
+import com.alibaba.fluss.types.DecimalType;
+import com.alibaba.fluss.types.DoubleType;
+import com.alibaba.fluss.types.FloatType;
+import com.alibaba.fluss.types.IntType;
+import com.alibaba.fluss.types.LocalZonedTimestampType;
+import com.alibaba.fluss.types.RowType;
+import com.alibaba.fluss.types.SmallIntType;
+import com.alibaba.fluss.types.StringType;
+import com.alibaba.fluss.types.TimestampType;
+import com.alibaba.fluss.types.TinyIntType;
+import com.alibaba.fluss.utils.CloseableIterator;
+
+import org.apache.flink.types.Row;
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.Snapshot;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.Table;
+import org.apache.paimon.table.source.ReadBuilder;
+import org.apache.paimon.table.source.Split;
+import org.apache.paimon.table.source.TableRead;
+import org.apache.paimon.types.DataTypes;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static com.alibaba.fluss.testutils.DataTestUtils.row;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test case for {@link PaimonRecordReader}. */
+public class PaimonRecordReaderTest extends PaimonLakeHouseTestBase {
+ @BeforeAll
+ protected static void beforeAll() {
+ PaimonLakeHouseTestBase.beforeAll();
+ }
+
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ void testReadLogTable(boolean isPartitioned) throws Exception {
+ // first of all, create table and prepare data
+ String tableName = "logTable_" + (isPartitioned ? "partitioned" :
"non_partitioned");
+
+ TablePath tablePath = TablePath.of(DEFAULT_DB, tableName);
+
+ List<InternalRow> writtenRows = new ArrayList<>();
+ prepareLogTable(tablePath, isPartitioned, DEFAULT_BUCKET_NUM,
writtenRows);
+
+ LakeSource<PaimonSplit> lakeSource =
lakeStorage.createLakeSource(tablePath);
+ Table table = getTable(tablePath);
+ Snapshot snapshot = table.latestSnapshot().get();
+ List<PaimonSplit> paimonSplits =
lakeSource.createPlanner(snapshot::id).plan();
+
+ List<Split> splits = ((FileStoreTable)
table).newScan().plan().splits();
+ assertThat(splits).hasSize(paimonSplits.size());
+ assertThat(splits)
+ .isEqualTo(
+ paimonSplits.stream()
+ .map(PaimonSplit::dataSplit)
+ .collect(Collectors.toList()));
+
+ List<Row> actual = new ArrayList<>();
+
+ InternalRow.FieldGetter[] fieldGetters =
Review Comment:
Can
```
InternalRow.FieldGetter[] fieldGetters =
InternalRow.createFieldGetters(getFlussRowType(isPartitioned));
for (PaimonSplit paimonSplit : paimonSplits) {
RecordReader recordReader = lakeSource.createRecordReader(() ->
paimonSplit);
CloseableIterator<LogRecord> iterator = recordReader.read();
while (iterator.hasNext()) {
InternalRow row = iterator.next().getRow();
Row flinkRow = new Row(fieldGetters.length);
for (int i = 0; i < fieldGetters.length; i++) {
flinkRow.setField(i,
fieldGetters[i].getFieldOrNull(row));
}
actual.add(flinkRow);
}
iterator.close();
}
```
be extracted into a common test so that the following test method
`testReadLogTableWithProject` can also reuse it?
##########
fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/lakehouse/PaimonRecordReaderTest.java:
##########
@@ -0,0 +1,321 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.lake.paimon.lakehouse;
+
+import com.alibaba.fluss.lake.paimon.flink.PaimonLakeHouseTestBase;
+import com.alibaba.fluss.lake.paimon.utils.PaimonRowAsFlussRow;
+import com.alibaba.fluss.lake.source.LakeSource;
+import com.alibaba.fluss.lake.source.RecordReader;
+import com.alibaba.fluss.metadata.TablePath;
+import com.alibaba.fluss.record.LogRecord;
+import com.alibaba.fluss.row.Decimal;
+import com.alibaba.fluss.row.InternalRow;
+import com.alibaba.fluss.row.TimestampLtz;
+import com.alibaba.fluss.row.TimestampNtz;
+import com.alibaba.fluss.types.BigIntType;
+import com.alibaba.fluss.types.BinaryType;
+import com.alibaba.fluss.types.BooleanType;
+import com.alibaba.fluss.types.DecimalType;
+import com.alibaba.fluss.types.DoubleType;
+import com.alibaba.fluss.types.FloatType;
+import com.alibaba.fluss.types.IntType;
+import com.alibaba.fluss.types.LocalZonedTimestampType;
+import com.alibaba.fluss.types.RowType;
+import com.alibaba.fluss.types.SmallIntType;
+import com.alibaba.fluss.types.StringType;
+import com.alibaba.fluss.types.TimestampType;
+import com.alibaba.fluss.types.TinyIntType;
+import com.alibaba.fluss.utils.CloseableIterator;
+
+import org.apache.flink.types.Row;
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.Snapshot;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.Table;
+import org.apache.paimon.table.source.ReadBuilder;
+import org.apache.paimon.table.source.Split;
+import org.apache.paimon.table.source.TableRead;
+import org.apache.paimon.types.DataTypes;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static com.alibaba.fluss.testutils.DataTestUtils.row;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test case for {@link PaimonRecordReader}. */
+public class PaimonRecordReaderTest extends PaimonLakeHouseTestBase {
+ @BeforeAll
+ protected static void beforeAll() {
+ PaimonLakeHouseTestBase.beforeAll();
+ }
+
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ void testReadLogTable(boolean isPartitioned) throws Exception {
+ // first of all, create table and prepare data
+ String tableName = "logTable_" + (isPartitioned ? "partitioned" :
"non_partitioned");
+
+ TablePath tablePath = TablePath.of(DEFAULT_DB, tableName);
+
+ List<InternalRow> writtenRows = new ArrayList<>();
+ prepareLogTable(tablePath, isPartitioned, DEFAULT_BUCKET_NUM,
writtenRows);
+
+ LakeSource<PaimonSplit> lakeSource =
lakeStorage.createLakeSource(tablePath);
+ Table table = getTable(tablePath);
+ Snapshot snapshot = table.latestSnapshot().get();
+ List<PaimonSplit> paimonSplits =
lakeSource.createPlanner(snapshot::id).plan();
+
+ List<Split> splits = ((FileStoreTable)
table).newScan().plan().splits();
+ assertThat(splits).hasSize(paimonSplits.size());
+ assertThat(splits)
+ .isEqualTo(
+ paimonSplits.stream()
+ .map(PaimonSplit::dataSplit)
+ .collect(Collectors.toList()));
+
+ List<Row> actual = new ArrayList<>();
+
+ InternalRow.FieldGetter[] fieldGetters =
Review Comment:
And I think test in `PaimonSortedRecordReaderTest` can also reuse it.
##########
fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/lakehouse/FlussRowAsPaimonRow.java:
##########
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.lake.paimon.lakehouse;
Review Comment:
change `com.alibaba.fluss.lake.paimon.lakehouse` to
`com.alibaba.fluss.lake.paimon.source` . Currently, `lakehouse` is not self
explanation.
##########
fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/lakehouse/PaimonSortedRecordReaderTest.java:
##########
@@ -0,0 +1,602 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ *
+ */
+
+package com.alibaba.fluss.lake.paimon.lakehouse;
+
+import com.alibaba.fluss.lake.paimon.flink.PaimonLakeHouseTestBase;
+import com.alibaba.fluss.lake.paimon.utils.PaimonRowAsFlussRow;
+import com.alibaba.fluss.lake.source.LakeSource;
+import com.alibaba.fluss.lake.source.RecordReader;
+import com.alibaba.fluss.lake.source.SortedRecordReader;
+import com.alibaba.fluss.metadata.TablePath;
+import com.alibaba.fluss.record.LogRecord;
+import com.alibaba.fluss.row.BinaryString;
+import com.alibaba.fluss.row.Decimal;
+import com.alibaba.fluss.row.GenericRow;
+import com.alibaba.fluss.row.InternalRow;
+import com.alibaba.fluss.row.TimestampLtz;
+import com.alibaba.fluss.row.TimestampNtz;
+import com.alibaba.fluss.types.BigIntType;
+import com.alibaba.fluss.types.BinaryType;
+import com.alibaba.fluss.types.BooleanType;
+import com.alibaba.fluss.types.DecimalType;
+import com.alibaba.fluss.types.DoubleType;
+import com.alibaba.fluss.types.FloatType;
+import com.alibaba.fluss.types.IntType;
+import com.alibaba.fluss.types.LocalZonedTimestampType;
+import com.alibaba.fluss.types.RowType;
+import com.alibaba.fluss.types.SmallIntType;
+import com.alibaba.fluss.types.StringType;
+import com.alibaba.fluss.types.TimestampType;
+import com.alibaba.fluss.types.TinyIntType;
+import com.alibaba.fluss.utils.CloseableIterator;
+
+import org.apache.flink.types.Row;
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.Snapshot;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.table.Table;
+import org.apache.paimon.table.source.ReadBuilder;
+import org.apache.paimon.table.source.Split;
+import org.apache.paimon.table.source.TableRead;
+import org.apache.paimon.types.DataTypes;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static
com.alibaba.fluss.lake.paimon.utils.FlussConversions.convertToFlinkRow;
+import static com.alibaba.fluss.testutils.DataTestUtils.row;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test case for {@link PaimonSortedRecordReader}. */
+public class PaimonSortedRecordReaderTest extends PaimonLakeHouseTestBase {
+ @BeforeAll
+ protected static void beforeAll() {
+ PaimonLakeHouseTestBase.beforeAll();
+ }
+
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ void testReadPkTable(boolean isPartitioned) throws Exception {
+ // first of all, create table and prepare data
+ String tableName = "logTable_" + (isPartitioned ? "partitioned" :
"non_partitioned");
+
+ TablePath tablePath = TablePath.of(DEFAULT_DB, tableName);
+
+ List<InternalRow> writtenRows = new ArrayList<>();
+ preparePkTable(tablePath, isPartitioned, DEFAULT_BUCKET_NUM,
writtenRows);
+
+ LakeSource<PaimonSplit> lakeSource =
lakeStorage.createLakeSource(tablePath);
+ Table table = getTable(tablePath);
+ Snapshot snapshot = table.latestSnapshot().get();
+ List<PaimonSplit> paimonSplits =
lakeSource.createPlanner(snapshot::id).plan();
+
+ List<Row> actual = new ArrayList<>();
+
+ InternalRow.FieldGetter[] fieldGetters =
+ InternalRow.createFieldGetters(getFlussRowType(isPartitioned));
+ for (PaimonSplit paimonSplit : paimonSplits) {
+ RecordReader recordReader = lakeSource.createRecordReader(() ->
paimonSplit);
+
assertThat(recordReader).isInstanceOf(PaimonSortedRecordReader.class);
+ CloseableIterator<LogRecord> iterator = recordReader.read();
+ while (iterator.hasNext()) {
+ InternalRow row = iterator.next().getRow();
+ actual.add(convertToFlinkRow(row, fieldGetters));
+ }
+ iterator.close();
+ }
+ List<Row> expectRows =
+ writtenRows.stream()
+ .map(r -> convertToFlinkRow(r, fieldGetters))
+ .collect(Collectors.toList());
+
+ assertThat(actual).containsExactlyInAnyOrderElementsOf(expectRows);
+
+ ArrayList<InternalRow> logRows = new ArrayList<>();
+ prepareFlussLogRows(5, isPartitioned ? "test" : null, logRows);
+ ArrayList<Row> expectedRows = new ArrayList<>();
+ for (int i = 0; i < 10; i++) {
+ GenericRow row =
+ isPartitioned
+ ? row(
+ i + 30,
+ true,
+ (byte) 100,
+ (short) 200,
+ 400L,
+ 500.1f,
+ 600.0d,
+ BinaryString.fromString(
+ i < 5 ? "update_string_" + i :
"another_string_" + i),
+ Decimal.fromUnscaledLong(900, 5, 2),
+ Decimal.fromBigDecimal(new
BigDecimal(1000), 20, 0),
+
TimestampLtz.fromEpochMillis(1698235273400L),
+
TimestampLtz.fromEpochMillis(1698235273400L, 7000),
+ TimestampNtz.fromMillis(1698235273501L),
+ TimestampNtz.fromMillis(1698235273501L,
8000),
+ new byte[] {5, 6, 7, 8},
+ BinaryString.fromString("test"),
+ 0,
+ (long) 10 + i,
+
TimestampNtz.fromMillis(System.currentTimeMillis()))
+ : row(
+ i + 30,
+ true,
+ (byte) 100,
+ (short) 200,
+ 400L,
+ 500.1f,
+ 600.0d,
+ BinaryString.fromString(
+ i < 5 ? "update_string_" + i :
"another_string_" + i),
+ Decimal.fromUnscaledLong(900, 5, 2),
+ Decimal.fromBigDecimal(new
BigDecimal(1000), 20, 0),
+
TimestampLtz.fromEpochMillis(1698235273400L),
+
TimestampLtz.fromEpochMillis(1698235273400L, 7000),
+ TimestampNtz.fromMillis(1698235273501L),
+ TimestampNtz.fromMillis(1698235273501L,
8000),
+ new byte[] {5, 6, 7, 8},
+ 0,
+ (long) 10 + i,
+
TimestampNtz.fromMillis(System.currentTimeMillis()));
+ expectedRows.add(convertToFlinkRow(row, fieldGetters));
+ }
+
+ List<Row> actualMergeRows =
+ mergeReadLakeRows(lakeSource, paimonSplits, fieldGetters,
logRows);
+
+
assertThat(actualMergeRows).containsExactlyInAnyOrderElementsOf(expectedRows);
+ }
+
+ @Test
+ void testReadPkTableWithProjectNoPk() throws Exception {
+ // first of all, create table and prepare data
+ String tableName = "logTable_non_partitioned";
+
+ TablePath tablePath = TablePath.of(DEFAULT_DB, tableName);
+
+ List<InternalRow> writtenRows = new ArrayList<>();
+ preparePkTable(tablePath, false, DEFAULT_BUCKET_NUM, writtenRows);
+
+ LakeSource<PaimonSplit> lakeSource =
lakeStorage.createLakeSource(tablePath);
+ Table table = getTable(tablePath);
+ Snapshot snapshot = table.latestSnapshot().get();
+ lakeSource.withProject(new int[][] {new int[] {5}, new int[] {1}, new
int[] {3}});
+ List<PaimonSplit> paimonSplits =
lakeSource.createPlanner(snapshot::id).plan();
+
+ List<Row> actual = new ArrayList<>();
+
+ InternalRow.FieldGetter[] fieldGetters =
+ InternalRow.createFieldGetters(
+ RowType.of(
+ new FloatType(),
+ new BooleanType(),
+ new SmallIntType(),
+ new IntType()));
+ for (PaimonSplit paimonSplit : paimonSplits) {
+ RecordReader recordReader = lakeSource.createRecordReader(() ->
paimonSplit);
+
assertThat(recordReader).isInstanceOf(PaimonSortedRecordReader.class);
+ CloseableIterator<LogRecord> iterator = recordReader.read();
+ while (iterator.hasNext()) {
+ InternalRow row = iterator.next().getRow();
+ actual.add(convertToFlinkRow(row, fieldGetters));
+ }
+ iterator.close();
+ }
+
+ List<Row> expectRows = new ArrayList<>();
+ ReadBuilder readBuilder = table.newReadBuilder().withProjection(new
int[] {5, 1, 3, 0});
+ List<Split> splits = readBuilder.newScan().plan().splits();
+ TableRead read = readBuilder.newRead();
+
org.apache.paimon.reader.RecordReader<org.apache.paimon.data.InternalRow>
reader =
+ read.createReader(splits);
+ reader.forEachRemaining(
+ paimonRow -> {
+ PaimonRowAsFlussRow row = new
PaimonRowAsFlussRow(paimonRow);
+ Row flinkRow = new Row(fieldGetters.length);
+ for (int i = 0; i < fieldGetters.length; i++) {
+ flinkRow.setField(i,
fieldGetters[i].getFieldOrNull(row));
+ }
+ expectRows.add(flinkRow);
+ });
+
+ assertThat(actual).containsExactlyInAnyOrderElementsOf(expectRows);
+ }
+
+ @Test
+ void testReadPkTableWithProjectPk() throws Exception {
+ // first of all, create table and prepare data
+ String tableName = "logTable_non_partitioned";
+
+ TablePath tablePath = TablePath.of(DEFAULT_DB, tableName);
+
+ List<InternalRow> writtenRows = new ArrayList<>();
+ preparePkTable(tablePath, false, DEFAULT_BUCKET_NUM, writtenRows);
+
+ LakeSource<PaimonSplit> lakeSource =
lakeStorage.createLakeSource(tablePath);
+ Table table = getTable(tablePath);
+ Snapshot snapshot = table.latestSnapshot().get();
+ lakeSource.withProject(
+ new int[][] {new int[] {5}, new int[] {1}, new int[] {3}, new
int[] {0}});
+ List<PaimonSplit> paimonSplits =
lakeSource.createPlanner(snapshot::id).plan();
+
+ List<Row> actual = new ArrayList<>();
+
+ InternalRow.FieldGetter[] fieldGetters =
+ InternalRow.createFieldGetters(
+ RowType.of(
+ new FloatType(),
+ new BooleanType(),
+ new SmallIntType(),
+ new IntType()));
+ for (PaimonSplit paimonSplit : paimonSplits) {
+ RecordReader recordReader = lakeSource.createRecordReader(() ->
paimonSplit);
+
assertThat(recordReader).isInstanceOf(PaimonSortedRecordReader.class);
+ CloseableIterator<LogRecord> iterator = recordReader.read();
+ while (iterator.hasNext()) {
+ InternalRow row = iterator.next().getRow();
+ actual.add(convertToFlinkRow(row, fieldGetters));
+ }
+ iterator.close();
+ }
+
+ List<Row> expectRows = new ArrayList<>();
Review Comment:
We can also extract
```
List<Row> expectRows = new ArrayList<>();
ReadBuilder readBuilder = table.newReadBuilder().withProjection(new
int[] {5, 1, 3, 0});
List<Split> splits = readBuilder.newScan().plan().splits();
TableRead read = readBuilder.newRead();
org.apache.paimon.reader.RecordReader<org.apache.paimon.data.InternalRow>
reader =
read.createReader(splits);
reader.forEachRemaining(
paimonRow -> {
PaimonRowAsFlussRow row = new
PaimonRowAsFlussRow(paimonRow);
Row flinkRow = new Row(fieldGetters.length);
for (int i = 0; i < fieldGetters.length; i++) {
flinkRow.setField(i,
fieldGetters[i].getFieldOrNull(row));
}
expectRows.add(flinkRow);
});
```
into a common method so that other tests can reuse it, right?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]