This is an automated email from the ASF dual-hosted git repository.
yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git
The following commit(s) were added to refs/heads/main by this push:
new 1f4ad5b1e [lake/iceberg] Support iceberg log table union read (#1684)
1f4ad5b1e is described below
commit 1f4ad5b1ef05ad62da0e6dec7e409d81536a743a
Author: Junbo Wang <[email protected]>
AuthorDate: Mon Sep 15 19:05:10 2025 +0800
[lake/iceberg] Support iceberg log table union read (#1684)
---------
Co-authored-by: luoyuxia <[email protected]>
---
.../apache/fluss/flink/utils/DataLakeUtils.java | 7 -
.../fluss/lake/iceberg/IcebergLakeCatalog.java | 14 +-
.../fluss/lake/iceberg/IcebergLakeStorage.java | 6 +-
.../lake/iceberg/source/IcebergLakeSource.java | 85 ++++++++
.../iceberg/source/IcebergRecordAsFlussRow.java | 3 +-
.../lake/iceberg/source/IcebergRecordReader.java | 136 ++++++++++++
.../fluss/lake/iceberg/source/IcebergSplit.java | 67 ++++++
.../lake/iceberg/source/IcebergSplitPlanner.java | 125 +++++++++++
.../iceberg/source/IcebergSplitSerializer.java | 51 +++++
.../iceberg/tiering/IcebergCatalogProvider.java | 12 +-
.../IcebergCatalogUtils.java} | 22 +-
.../flink/FlinkUnionReadLogTableITCase.java | 196 ++++++-----------
.../lake/iceberg/flink/FlinkUnionReadTestBase.java | 58 +++++
.../source/IcebergRecordAsFlussRowTest.java | 6 +-
.../iceberg/source/IcebergRecordReaderTest.java | 239 +++++++++++++++++++++
.../lake/iceberg/source/IcebergSourceTestBase.java | 159 ++++++++++++++
.../iceberg/source/IcebergSplitPlannerTest.java | 147 +++++++++++++
.../iceberg/source/IcebergSplitSerializerTest.java | 88 ++++++++
.../testutils/FlinkIcebergTieringTestBase.java | 29 +++
.../lake/iceberg/utils/IcebergConversionsTest.java | 9 +-
.../fluss/lake/paimon/source/PaimonLakeSource.java | 1 +
.../paimon/flink/FlinkUnionReadLogTableITCase.java | 4 +-
fluss-test-coverage/pom.xml | 1 +
23 files changed, 1269 insertions(+), 196 deletions(-)
diff --git
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/DataLakeUtils.java
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/DataLakeUtils.java
index b4d82109a..7cc09a139 100644
---
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/DataLakeUtils.java
+++
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/DataLakeUtils.java
@@ -42,13 +42,6 @@ public class DataLakeUtils {
ConfigOptions.TABLE_DATALAKE_FORMAT.key()));
}
- if (datalakeFormat != DataLakeFormat.PAIMON) {
- throw new UnsupportedOperationException(
- String.format(
- "The datalake format %s " + " is not supported.
Only %s is supported.",
- datalakeFormat, DataLakeFormat.PAIMON));
- }
-
// currently, extract datalake catalog config
String dataLakePrefix = "table.datalake." + datalakeFormat + ".";
return extractAndRemovePrefix(tableOptions.toMap(), dataLakePrefix);
diff --git
a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/IcebergLakeCatalog.java
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/IcebergLakeCatalog.java
index 1f3a2b8cb..842fcddde 100644
---
a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/IcebergLakeCatalog.java
+++
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/IcebergLakeCatalog.java
@@ -20,7 +20,7 @@ package org.apache.fluss.lake.iceberg;
import org.apache.fluss.annotation.VisibleForTesting;
import org.apache.fluss.config.Configuration;
import org.apache.fluss.exception.TableAlreadyExistException;
-import org.apache.fluss.lake.iceberg.conf.IcebergConfiguration;
+import org.apache.fluss.lake.iceberg.utils.IcebergCatalogUtils;
import org.apache.fluss.lake.lakestorage.LakeCatalog;
import org.apache.fluss.metadata.TableDescriptor;
import org.apache.fluss.metadata.TablePath;
@@ -49,13 +49,10 @@ import java.util.Set;
import static org.apache.fluss.metadata.TableDescriptor.BUCKET_COLUMN_NAME;
import static org.apache.fluss.metadata.TableDescriptor.OFFSET_COLUMN_NAME;
import static org.apache.fluss.metadata.TableDescriptor.TIMESTAMP_COLUMN_NAME;
-import static org.apache.iceberg.CatalogUtil.buildIcebergCatalog;
/** An Iceberg implementation of {@link LakeCatalog}. */
public class IcebergLakeCatalog implements LakeCatalog {
- public static final String ICEBERG_CATALOG_DEFAULT_NAME =
"fluss-iceberg-catalog";
-
public static final LinkedHashMap<String, Type> SYSTEM_COLUMNS = new
LinkedHashMap<>();
static {
@@ -74,7 +71,7 @@ public class IcebergLakeCatalog implements LakeCatalog {
private static final String ICEBERG_CONF_PREFIX = "iceberg.";
public IcebergLakeCatalog(Configuration configuration) {
- this.icebergCatalog = createIcebergCatalog(configuration);
+ this.icebergCatalog =
IcebergCatalogUtils.createIcebergCatalog(configuration);
}
@VisibleForTesting
@@ -82,13 +79,6 @@ public class IcebergLakeCatalog implements LakeCatalog {
return icebergCatalog;
}
- private Catalog createIcebergCatalog(Configuration configuration) {
- Map<String, String> icebergProps = configuration.toMap();
- String catalogName = icebergProps.getOrDefault("name",
ICEBERG_CATALOG_DEFAULT_NAME);
- return buildIcebergCatalog(
- catalogName, icebergProps,
IcebergConfiguration.from(configuration).get());
- }
-
@Override
public void createTable(TablePath tablePath, TableDescriptor
tableDescriptor)
throws TableAlreadyExistException {
diff --git
a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/IcebergLakeStorage.java
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/IcebergLakeStorage.java
index fc3bf97c7..fd631d252 100644
---
a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/IcebergLakeStorage.java
+++
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/IcebergLakeStorage.java
@@ -18,6 +18,8 @@
package org.apache.fluss.lake.iceberg;
import org.apache.fluss.config.Configuration;
+import org.apache.fluss.lake.iceberg.source.IcebergLakeSource;
+import org.apache.fluss.lake.iceberg.source.IcebergSplit;
import org.apache.fluss.lake.iceberg.tiering.IcebergLakeTieringFactory;
import org.apache.fluss.lake.lakestorage.LakeStorage;
import org.apache.fluss.lake.source.LakeSource;
@@ -44,7 +46,7 @@ public class IcebergLakeStorage implements LakeStorage {
}
@Override
- public LakeSource<?> createLakeSource(TablePath tablePath) {
- throw new UnsupportedOperationException("Not implemented");
+ public LakeSource<IcebergSplit> createLakeSource(TablePath tablePath) {
+ return new IcebergLakeSource(icebergConfig, tablePath);
}
}
diff --git
a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/source/IcebergLakeSource.java
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/source/IcebergLakeSource.java
new file mode 100644
index 000000000..31eee6fb5
--- /dev/null
+++
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/source/IcebergLakeSource.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.lake.iceberg.source;
+
+import org.apache.fluss.config.Configuration;
+import org.apache.fluss.lake.iceberg.utils.IcebergCatalogUtils;
+import org.apache.fluss.lake.serializer.SimpleVersionedSerializer;
+import org.apache.fluss.lake.source.LakeSource;
+import org.apache.fluss.lake.source.Planner;
+import org.apache.fluss.lake.source.RecordReader;
+import org.apache.fluss.metadata.TablePath;
+import org.apache.fluss.predicate.Predicate;
+
+import org.apache.iceberg.Table;
+import org.apache.iceberg.catalog.Catalog;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+
+import static org.apache.fluss.lake.iceberg.utils.IcebergConversions.toIceberg;
+
+/** Iceberg lake source. */
+public class IcebergLakeSource implements LakeSource<IcebergSplit> {
+ private static final long serialVersionUID = 1L;
+ private final Configuration icebergConfig;
+ private final TablePath tablePath;
+ private @Nullable int[][] project;
+
+ public IcebergLakeSource(Configuration icebergConfig, TablePath tablePath)
{
+ this.icebergConfig = icebergConfig;
+ this.tablePath = tablePath;
+ }
+
+ @Override
+ public void withProject(int[][] project) {
+ this.project = project;
+ }
+
+ @Override
+ public void withLimit(int limit) {
+ throw new UnsupportedOperationException("Not impl.");
+ }
+
+ @Override
+ public FilterPushDownResult withFilters(List<Predicate> predicates) {
+ // TODO: Support filter push down. #1676
+ return FilterPushDownResult.of(Collections.emptyList(), predicates);
+ }
+
+ @Override
+ public Planner<IcebergSplit> createPlanner(PlannerContext context) throws
IOException {
+ return new IcebergSplitPlanner(icebergConfig, tablePath,
context.snapshotId());
+ }
+
+ @Override
+ public RecordReader createRecordReader(ReaderContext<IcebergSplit>
context) throws IOException {
+ Catalog catalog =
IcebergCatalogUtils.createIcebergCatalog(icebergConfig);
+ Table table = catalog.loadTable(toIceberg(tablePath));
+ return new IcebergRecordReader(context.lakeSplit().fileScanTask(),
table, project);
+ }
+
+ @Override
+ public SimpleVersionedSerializer<IcebergSplit> getSplitSerializer() {
+ return new IcebergSplitSerializer();
+ }
+}
diff --git
a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/source/IcebergRecordAsFlussRow.java
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/source/IcebergRecordAsFlussRow.java
index da9c564e1..7c7da6c51 100644
---
a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/source/IcebergRecordAsFlussRow.java
+++
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/source/IcebergRecordAsFlussRow.java
@@ -41,8 +41,9 @@ public class IcebergRecordAsFlussRow implements InternalRow {
public IcebergRecordAsFlussRow() {}
- public void setIcebergRecord(Record icebergRecord) {
+ public IcebergRecordAsFlussRow replaceIcebergRecord(Record icebergRecord) {
this.icebergRecord = icebergRecord;
+ return this;
}
@Override
diff --git
a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/source/IcebergRecordReader.java
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/source/IcebergRecordReader.java
new file mode 100644
index 000000000..d848ec152
--- /dev/null
+++
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/source/IcebergRecordReader.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.lake.iceberg.source;
+
+import org.apache.fluss.lake.source.RecordReader;
+import org.apache.fluss.record.ChangeType;
+import org.apache.fluss.record.GenericRecord;
+import org.apache.fluss.record.LogRecord;
+import org.apache.fluss.row.ProjectedRow;
+import org.apache.fluss.utils.CloseableIterator;
+
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableScan;
+import org.apache.iceberg.data.IcebergGenericReader;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.types.Types;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.time.OffsetDateTime;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.IntStream;
+
+import static org.apache.fluss.metadata.TableDescriptor.OFFSET_COLUMN_NAME;
+import static org.apache.fluss.metadata.TableDescriptor.TIMESTAMP_COLUMN_NAME;
+
+/** Iceberg record reader. */
+public class IcebergRecordReader implements RecordReader {
+ protected IcebergRecordAsFlussRecordIterator iterator;
+ protected @Nullable int[][] project;
+ protected Types.StructType struct;
+
+ public IcebergRecordReader(FileScanTask fileScanTask, Table table,
@Nullable int[][] project) {
+ TableScan tableScan = table.newScan();
+ if (project != null) {
+ tableScan = applyProject(tableScan, project);
+ }
+ IcebergGenericReader reader = new IcebergGenericReader(tableScan,
true);
+ struct = tableScan.schema().asStruct();
+ this.iterator = new
IcebergRecordAsFlussRecordIterator(reader.open(fileScanTask), struct);
+ }
+
+ @Override
+ public CloseableIterator<LogRecord> read() throws IOException {
+ return iterator;
+ }
+
+ private TableScan applyProject(TableScan tableScan, int[][] projects) {
+ Types.StructType structType = tableScan.schema().asStruct();
+ List<Types.NestedField> cols = new ArrayList<>(projects.length + 2);
+
+ for (int[] project : projects) {
+ cols.add(structType.fields().get(project[0]));
+ }
+
+ cols.add(structType.field(OFFSET_COLUMN_NAME));
+ cols.add(structType.field(TIMESTAMP_COLUMN_NAME));
+ return tableScan.project(new Schema(cols));
+ }
+
+ /** Iterator for iceberg record as fluss record. */
+ public static class IcebergRecordAsFlussRecordIterator implements
CloseableIterator<LogRecord> {
+
+ private final org.apache.iceberg.io.CloseableIterator<Record>
icebergRecordIterator;
+
+ private final ProjectedRow projectedRow;
+ private final IcebergRecordAsFlussRow icebergRecordAsFlussRow;
+
+ private final int logOffsetColIndex;
+ private final int timestampColIndex;
+
+ public IcebergRecordAsFlussRecordIterator(
+ CloseableIterable<Record> icebergRecordIterator,
Types.StructType struct) {
+ this.icebergRecordIterator = icebergRecordIterator.iterator();
+ this.logOffsetColIndex =
struct.fields().indexOf(struct.field(OFFSET_COLUMN_NAME));
+ this.timestampColIndex =
struct.fields().indexOf(struct.field(TIMESTAMP_COLUMN_NAME));
+
+ int[] project = IntStream.range(0, struct.fields().size() -
2).toArray();
+ projectedRow = ProjectedRow.from(project);
+ icebergRecordAsFlussRow = new IcebergRecordAsFlussRow();
+ }
+
+ @Override
+ public void close() {
+ try {
+ icebergRecordIterator.close();
+ } catch (Exception e) {
+ throw new RuntimeException("Fail to close iterator.", e);
+ }
+ }
+
+ @Override
+ public boolean hasNext() {
+ return icebergRecordIterator.hasNext();
+ }
+
+ @Override
+ public LogRecord next() {
+ Record icebergRecord = icebergRecordIterator.next();
+ long offset = icebergRecord.get(logOffsetColIndex, Long.class);
+ long timestamp =
+ icebergRecord
+ .get(timestampColIndex, OffsetDateTime.class)
+ .toInstant()
+ .toEpochMilli();
+
+ return new GenericRecord(
+ offset,
+ timestamp,
+ ChangeType.INSERT,
+ projectedRow.replaceRow(
+
icebergRecordAsFlussRow.replaceIcebergRecord(icebergRecord)));
+ }
+ }
+}
diff --git
a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/source/IcebergSplit.java
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/source/IcebergSplit.java
new file mode 100644
index 000000000..65cc10a09
--- /dev/null
+++
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/source/IcebergSplit.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.lake.iceberg.source;
+
+import org.apache.fluss.lake.source.LakeSplit;
+
+import org.apache.iceberg.FileScanTask;
+
+import java.io.Serializable;
+import java.util.List;
+
+/** Split for Iceberg table. */
+public class IcebergSplit implements LakeSplit, Serializable {
+ private static final long serialVersionUID = 1L;
+
+ private final FileScanTask fileScanTask;
+ private final int bucket;
+ private final List<String> partition;
+
+ public IcebergSplit(FileScanTask fileScanTask, int bucket, List<String>
partition) {
+ this.fileScanTask = fileScanTask;
+ this.bucket = bucket;
+ this.partition = partition;
+ }
+
+ @Override
+ public int bucket() {
+ return bucket;
+ }
+
+ @Override
+ public List<String> partition() {
+ return partition;
+ }
+
+ public FileScanTask fileScanTask() {
+ return fileScanTask;
+ }
+
+ @Override
+ public String toString() {
+ return "IcebergSplit{"
+ + "task="
+ + fileScanTask
+ + ", bucket="
+ + bucket
+ + ", partition="
+ + partition
+ + '}';
+ }
+}
diff --git
a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/source/IcebergSplitPlanner.java
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/source/IcebergSplitPlanner.java
new file mode 100644
index 000000000..89a5dc269
--- /dev/null
+++
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/source/IcebergSplitPlanner.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.lake.iceberg.source;
+
+import org.apache.fluss.config.Configuration;
+import org.apache.fluss.lake.iceberg.utils.IcebergCatalogUtils;
+import org.apache.fluss.lake.source.Planner;
+import org.apache.fluss.metadata.TablePath;
+
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.PartitionField;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.io.CloseableIterable;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static org.apache.fluss.lake.iceberg.utils.IcebergConversions.toIceberg;
+import static org.apache.fluss.metadata.TableDescriptor.BUCKET_COLUMN_NAME;
+
+/** Iceberg split planner. */
+public class IcebergSplitPlanner implements Planner<IcebergSplit> {
+
+ private final Configuration icebergConfig;
+ private final TablePath tablePath;
+ private final long snapshotId;
+
+ public IcebergSplitPlanner(Configuration icebergConfig, TablePath
tablePath, long snapshotId) {
+ this.icebergConfig = icebergConfig;
+ this.tablePath = tablePath;
+ this.snapshotId = snapshotId;
+ }
+
+ @Override
+ public List<IcebergSplit> plan() throws IOException {
+ List<IcebergSplit> splits = new ArrayList<>();
+ Catalog catalog =
IcebergCatalogUtils.createIcebergCatalog(icebergConfig);
+ Table table = catalog.loadTable(toIceberg(tablePath));
+ Function<FileScanTask, List<String>> partitionExtract =
createPartitionExtractor(table);
+ Function<FileScanTask, Integer> bucketExtractor =
createBucketExtractor(table);
+ try (CloseableIterable<FileScanTask> tasks =
+ table.newScan()
+ .useSnapshot(snapshotId)
+ .includeColumnStats()
+ .ignoreResiduals()
+ .planFiles()) {
+ tasks.forEach(
+ task ->
+ splits.add(
+ new IcebergSplit(
+ task,
+ bucketExtractor.apply(task),
+ partitionExtract.apply(task))));
+ }
+ return splits;
+ }
+
+ private Function<FileScanTask, Integer> createBucketExtractor(Table table)
{
+ PartitionSpec partitionSpec = table.spec();
+ List<PartitionField> partitionFields = partitionSpec.fields();
+
+ // the last one must be partition by fluss bucket
+ PartitionField bucketField =
partitionFields.get(partitionFields.size() - 1);
+
+ if (table.schema()
+ .asStruct()
+ .field(bucketField.sourceId())
+ .name()
+ .equals(BUCKET_COLUMN_NAME)) {
+ // partition by __bucket column, should be fluss log table without
bucket key,
+ // we don't care about the bucket since it's bucket un-aware
+ return task -> -1;
+ } else {
+ int bucketFieldIndex = partitionFields.size() - 1;
+ return task -> task.file().partition().get(bucketFieldIndex,
Integer.class);
+ }
+ }
+
+ private Function<FileScanTask, List<String>>
createPartitionExtractor(Table table) {
+ PartitionSpec partitionSpec = table.spec();
+ List<PartitionField> partitionFields = partitionSpec.fields();
+
+ // if only one partition, it must not be partitioned table since we
will always use
+ // partition by fluss bucket
+ if (partitionSpec.fields().size() <= 1) {
+ return task -> Collections.emptyList();
+ } else {
+ List<Integer> partitionFieldIndices =
+ // since will always first partition by fluss partition
columns, then fluss
+ // bucket,
+ // just ignore the last partition column of iceberg
+ IntStream.range(0, partitionFields.size() - 1)
+ .boxed()
+ .collect(Collectors.toList());
+ return task ->
+ partitionFieldIndices.stream()
+ // since currently, only string partition is
supported
+ .map(index -> task.partition().get(index,
String.class))
+ .collect(Collectors.toList());
+ }
+ }
+}
diff --git
a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/source/IcebergSplitSerializer.java
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/source/IcebergSplitSerializer.java
new file mode 100644
index 000000000..507f49d64
--- /dev/null
+++
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/source/IcebergSplitSerializer.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.lake.iceberg.source;
+
+import org.apache.fluss.lake.serializer.SimpleVersionedSerializer;
+import org.apache.fluss.utils.InstantiationUtils;
+
+import java.io.IOException;
+
+/** Serializer for Iceberg split. */
+public class IcebergSplitSerializer implements
SimpleVersionedSerializer<IcebergSplit> {
+ private static final int CURRENT_VERSION = 1;
+
+ @Override
+ public int getVersion() {
+ return CURRENT_VERSION;
+ }
+
+ @Override
+ public byte[] serialize(IcebergSplit icebergSplit) throws IOException {
+ return InstantiationUtils.serializeObject(icebergSplit);
+ }
+
+ @Override
+ public IcebergSplit deserialize(int version, byte[] serialized) throws
IOException {
+ IcebergSplit icebergSplit;
+ try {
+ icebergSplit =
+ InstantiationUtils.deserializeObject(serialized,
getClass().getClassLoader());
+ } catch (ClassNotFoundException e) {
+ throw new IOException(e);
+ }
+ return icebergSplit;
+ }
+}
diff --git
a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/tiering/IcebergCatalogProvider.java
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/tiering/IcebergCatalogProvider.java
index 3c53918ec..db468035f 100644
---
a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/tiering/IcebergCatalogProvider.java
+++
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/tiering/IcebergCatalogProvider.java
@@ -18,15 +18,11 @@
package org.apache.fluss.lake.iceberg.tiering;
import org.apache.fluss.config.Configuration;
-import org.apache.fluss.lake.iceberg.conf.IcebergConfiguration;
+import org.apache.fluss.lake.iceberg.utils.IcebergCatalogUtils;
import org.apache.iceberg.catalog.Catalog;
import java.io.Serializable;
-import java.util.Map;
-
-import static
org.apache.fluss.lake.iceberg.IcebergLakeCatalog.ICEBERG_CATALOG_DEFAULT_NAME;
-import static org.apache.iceberg.CatalogUtil.buildIcebergCatalog;
/** A provider for Iceberg catalog. */
public class IcebergCatalogProvider implements Serializable {
@@ -39,10 +35,6 @@ public class IcebergCatalogProvider implements Serializable {
}
public Catalog get() {
- Map<String, String> icebergProps = icebergConfig.toMap();
- String catalogName = icebergProps.getOrDefault("name",
ICEBERG_CATALOG_DEFAULT_NAME);
-
- return buildIcebergCatalog(
- catalogName, icebergProps,
IcebergConfiguration.from(icebergConfig).get());
+ return IcebergCatalogUtils.createIcebergCatalog(icebergConfig);
}
}
diff --git
a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/tiering/IcebergCatalogProvider.java
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/utils/IcebergCatalogUtils.java
similarity index 67%
copy from
fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/tiering/IcebergCatalogProvider.java
copy to
fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/utils/IcebergCatalogUtils.java
index 3c53918ec..8d91fef14 100644
---
a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/tiering/IcebergCatalogProvider.java
+++
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/utils/IcebergCatalogUtils.java
@@ -15,34 +15,26 @@
* limitations under the License.
*/
-package org.apache.fluss.lake.iceberg.tiering;
+package org.apache.fluss.lake.iceberg.utils;
import org.apache.fluss.config.Configuration;
import org.apache.fluss.lake.iceberg.conf.IcebergConfiguration;
import org.apache.iceberg.catalog.Catalog;
-import java.io.Serializable;
import java.util.Map;
-import static
org.apache.fluss.lake.iceberg.IcebergLakeCatalog.ICEBERG_CATALOG_DEFAULT_NAME;
import static org.apache.iceberg.CatalogUtil.buildIcebergCatalog;
-/** A provider for Iceberg catalog. */
-public class IcebergCatalogProvider implements Serializable {
+/** Iceberg catalog utils. */
+public class IcebergCatalogUtils {
- private static final long serialVersionUID = 1L;
- private final Configuration icebergConfig;
+ public static final String ICEBERG_CATALOG_DEFAULT_NAME =
"fluss-iceberg-catalog";
- public IcebergCatalogProvider(Configuration icebergConfig) {
- this.icebergConfig = icebergConfig;
- }
-
- public Catalog get() {
- Map<String, String> icebergProps = icebergConfig.toMap();
+ public static Catalog createIcebergCatalog(Configuration configuration) {
+ Map<String, String> icebergProps = configuration.toMap();
String catalogName = icebergProps.getOrDefault("name",
ICEBERG_CATALOG_DEFAULT_NAME);
-
return buildIcebergCatalog(
- catalogName, icebergProps,
IcebergConfiguration.from(icebergConfig).get());
+ catalogName, icebergProps,
IcebergConfiguration.from(configuration).get());
}
}
diff --git
a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/flink/FlinkUnionReadLogTableITCase.java
b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/flink/FlinkUnionReadLogTableITCase.java
similarity index 61%
copy from
fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/flink/FlinkUnionReadLogTableITCase.java
copy to
fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/flink/FlinkUnionReadLogTableITCase.java
index 987bc0451..5b07b68f4 100644
---
a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/flink/FlinkUnionReadLogTableITCase.java
+++
b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/flink/FlinkUnionReadLogTableITCase.java
@@ -1,12 +1,13 @@
/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -15,49 +16,42 @@
* limitations under the License.
*/
-package org.apache.fluss.lake.paimon.flink;
+package org.apache.fluss.lake.iceberg.flink;
+import org.apache.fluss.config.AutoPartitionTimeUnit;
+import org.apache.fluss.config.ConfigOptions;
+import org.apache.fluss.metadata.Schema;
+import org.apache.fluss.metadata.TableDescriptor;
import org.apache.fluss.metadata.TablePath;
import org.apache.fluss.row.Decimal;
import org.apache.fluss.row.InternalRow;
import org.apache.fluss.row.TimestampLtz;
import org.apache.fluss.row.TimestampNtz;
+import org.apache.fluss.types.DataTypes;
import org.apache.flink.core.execution.JobClient;
-import org.apache.flink.core.execution.SavepointFormatType;
-import org.apache.flink.table.api.TableResult;
-import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
-import org.apache.flink.util.CloseableIterator;
import org.apache.flink.util.CollectionUtil;
import org.junit.jupiter.api.BeforeAll;
-import org.junit.jupiter.api.io.TempDir;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import javax.annotation.Nullable;
-import java.io.File;
+import java.time.Duration;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.ArrayList;
-import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
-import static
org.apache.fluss.flink.source.testutils.FlinkRowAssertionsUtils.assertResultsExactOrder;
-import static
org.apache.fluss.flink.source.testutils.FlinkRowAssertionsUtils.assertResultsIgnoreOrder;
-import static
org.apache.fluss.flink.source.testutils.FlinkRowAssertionsUtils.assertRowResultsIgnoreOrder;
import static org.apache.fluss.testutils.DataTestUtils.row;
import static org.assertj.core.api.Assertions.assertThat;
-/** The IT case for Flink union data in lake and fluss for log table. */
-class FlinkUnionReadLogTableITCase extends FlinkUnionReadTestBase {
-
- @TempDir public static File savepointDir;
-
+/** Test union read log table with full type. */
+public class FlinkUnionReadLogTableITCase extends FlinkUnionReadTestBase {
@BeforeAll
protected static void beforeAll() {
FlinkUnionReadTestBase.beforeAll();
@@ -77,22 +71,22 @@ class FlinkUnionReadLogTableITCase extends
FlinkUnionReadTestBase {
// wait until records has been synced
waitUntilBucketSynced(t1, tableId, DEFAULT_BUCKET_NUM, isPartitioned);
- // now, start to read the log table, which will read paimon
- // may read fluss or not, depends on the log offset of paimon snapshot
+ // now, start to read the log table, which will read iceberg
+ // may read fluss or not, depends on the log offset of iceberg snapshot
List<Row> actual =
CollectionUtil.iteratorToList(
batchTEnv.executeSql("select * from " +
tableName).collect());
assertThat(actual).containsExactlyInAnyOrderElementsOf(writtenRows);
- // can database sync job
+ // cancel the tiering job
jobClient.cancel().get();
// write some log data again
writtenRows.addAll(writeRows(t1, 3, isPartitioned));
// query the log table again and check the data
- // it should read both paimon snapshot and fluss log
+ // it should read both iceberg snapshot and fluss log
actual =
CollectionUtil.iteratorToList(
batchTEnv.executeSql("select * from " +
tableName).collect());
@@ -117,9 +111,10 @@ class FlinkUnionReadLogTableITCase extends
FlinkUnionReadTestBase {
String plan = batchTEnv.explainSql(sqlWithPartitionFilter);
// check if the plan contains partition filter
+ // TODO: push down iceberg partition filter
assertThat(plan)
.contains("TableSourceScan(")
- .contains("filter=[=(p, _UTF-16LE'" + partition + "'");
+ .contains("LogicalFilter(condition=[=($15, _UTF-16LE'" +
partition + "'");
List<Row> expectedFiltered =
writtenRows.stream()
@@ -134,114 +129,6 @@ class FlinkUnionReadLogTableITCase extends
FlinkUnionReadTestBase {
}
}
- @ParameterizedTest
- @ValueSource(booleans = {false, true})
- void testReadLogTableInStreamMode(boolean isPartitioned) throws Exception {
- // first of all, start tiering
- JobClient jobClient = buildTieringJob(execEnv);
-
- String tableName = "stream_logTable_" + (isPartitioned ? "partitioned"
: "non_partitioned");
-
- TablePath t1 = TablePath.of(DEFAULT_DB, tableName);
- List<Row> writtenRows = new LinkedList<>();
- long tableId = prepareLogTable(t1, DEFAULT_BUCKET_NUM, isPartitioned,
writtenRows);
- // wait until records has been synced
- waitUntilBucketSynced(t1, tableId, DEFAULT_BUCKET_NUM, isPartitioned);
-
- // now, start to read the log table, which will read paimon
- // may read fluss or not, depends on the log offset of paimon snapshot
- CloseableIterator<Row> actual =
- streamTEnv.executeSql("select * from " + tableName).collect();
- assertResultsIgnoreOrder(
- actual,
writtenRows.stream().map(Row::toString).collect(Collectors.toList()), true);
-
- // can database sync job
- jobClient.cancel().get();
-
- // write some log data again
- writtenRows.addAll(writeRows(t1, 3, isPartitioned));
-
- // query the log table again and check the data
- // it should read both paimon snapshot and fluss log
- actual =
- streamTEnv
- .executeSql(
- "select * from "
- + tableName
- + " /*+
OPTIONS('scan.partition.discovery.interval'='100ms') */")
- .collect();
- if (isPartitioned) {
- // we write to a new partition to verify partition discovery
- writtenRows.addAll(writeFullTypeRows(t1, 10, "3027"));
- }
- assertResultsIgnoreOrder(
- actual,
writtenRows.stream().map(Row::toString).collect(Collectors.toList()), true);
- }
-
- @ParameterizedTest
- @ValueSource(booleans = {false, true})
- void testUnionReadLogTableFailover(boolean isPartitioned) throws Exception
{
- // first of all, start tiering
- JobClient jobClient = buildTieringJob(execEnv);
-
- String tableName1 =
- "restore_logTable_" + (isPartitioned ? "partitioned" :
"non_partitioned");
- String resultTableName =
- "result_table" + (isPartitioned ? "partitioned" :
"non_partitioned");
-
- TablePath table1 = TablePath.of(DEFAULT_DB, tableName1);
- TablePath resultTable = TablePath.of(DEFAULT_DB, resultTableName);
- List<Row> writtenRows = new LinkedList<>();
- long tableId = prepareLogTable(table1, DEFAULT_BUCKET_NUM,
isPartitioned, writtenRows);
- // wait until records has been synced
- waitUntilBucketSynced(table1, tableId, DEFAULT_BUCKET_NUM,
isPartitioned);
-
- StreamTableEnvironment streamTEnv = buildSteamTEnv(null);
- // now, start to read the log table to write to a fluss result table
- // may read fluss or not, depends on the log offset of paimon snapshot
- createFullTypeLogTable(resultTable, DEFAULT_BUCKET_NUM, isPartitioned,
false);
- TableResult insertResult =
- streamTEnv.executeSql(
- "insert into " + resultTableName + " select * from " +
tableName1);
-
- CloseableIterator<Row> actual =
- streamTEnv.executeSql("select * from " +
resultTableName).collect();
- if (isPartitioned) {
- assertRowResultsIgnoreOrder(actual, writtenRows, false);
- } else {
- assertResultsExactOrder(actual, writtenRows, false);
- }
-
- // now, stop the job with save point
- String savepointPath =
- insertResult
- .getJobClient()
- .get()
- .stopWithSavepoint(
- false,
- savepointDir.getAbsolutePath(),
- SavepointFormatType.CANONICAL)
- .get();
-
- // re buildSteamTEnv
- streamTEnv = buildSteamTEnv(savepointPath);
- insertResult =
- streamTEnv.executeSql(
- "insert into " + resultTableName + " select * from " +
tableName1);
-
- // write some log data again
- List<Row> rows = writeRows(table1, 3, isPartitioned);
- if (isPartitioned) {
- assertRowResultsIgnoreOrder(actual, rows, true);
- } else {
- assertResultsExactOrder(actual, rows, true);
- }
-
- // cancel jobs
- insertResult.getJobClient().get().cancel().get();
- jobClient.cancel().get();
- }
-
private long prepareLogTable(
TablePath tablePath, int bucketNum, boolean isPartitioned,
List<Row> flinkRows)
throws Exception {
@@ -263,6 +150,43 @@ class FlinkUnionReadLogTableITCase extends
FlinkUnionReadTestBase {
return t1Id;
}
+ protected long createFullTypeLogTable(TablePath tablePath, int bucketNum,
boolean isPartitioned)
+ throws Exception {
+ Schema.Builder schemaBuilder =
+ Schema.newBuilder()
+ .column("f_boolean", DataTypes.BOOLEAN())
+ .column("f_byte", DataTypes.TINYINT())
+ .column("f_short", DataTypes.SMALLINT())
+ .column("f_int", DataTypes.INT())
+ .column("f_long", DataTypes.BIGINT())
+ .column("f_float", DataTypes.FLOAT())
+ .column("f_double", DataTypes.DOUBLE())
+ .column("f_string", DataTypes.STRING())
+ .column("f_decimal1", DataTypes.DECIMAL(5, 2))
+ .column("f_decimal2", DataTypes.DECIMAL(20, 0))
+ .column("f_timestamp_ltz1", DataTypes.TIMESTAMP_LTZ(3))
+ .column("f_timestamp_ltz2", DataTypes.TIMESTAMP_LTZ(6))
+ .column("f_timestamp_ntz1", DataTypes.TIMESTAMP(3))
+ .column("f_timestamp_ntz2", DataTypes.TIMESTAMP(6))
+ .column("f_binary", DataTypes.BINARY(4));
+
+ TableDescriptor.Builder tableBuilder =
+ TableDescriptor.builder()
+ .distributedBy(bucketNum, "f_int")
+ .property(ConfigOptions.TABLE_DATALAKE_ENABLED.key(),
"true")
+ .property(ConfigOptions.TABLE_DATALAKE_FRESHNESS,
Duration.ofMillis(500));
+
+ if (isPartitioned) {
+ schemaBuilder.column("p", DataTypes.STRING());
+ tableBuilder.property(ConfigOptions.TABLE_AUTO_PARTITION_ENABLED,
true);
+ tableBuilder.partitionedBy("p");
+ tableBuilder.property(
+ ConfigOptions.TABLE_AUTO_PARTITION_TIME_UNIT,
AutoPartitionTimeUnit.YEAR);
+ }
+ tableBuilder.schema(schemaBuilder.build());
+ return createTable(tablePath, tableBuilder.build());
+ }
+
private List<Row> writeFullTypeRows(
TablePath tablePath, int rowCount, @Nullable String partition)
throws Exception {
List<InternalRow> rows = new ArrayList<>();
diff --git
a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/flink/FlinkUnionReadTestBase.java
b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/flink/FlinkUnionReadTestBase.java
new file mode 100644
index 000000000..8438c2499
--- /dev/null
+++
b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/flink/FlinkUnionReadTestBase.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.lake.iceberg.flink;
+
+import org.apache.fluss.config.ConfigOptions;
+import org.apache.fluss.lake.iceberg.testutils.FlinkIcebergTieringTestBase;
+
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+
+import static org.apache.fluss.flink.FlinkConnectorOptions.BOOTSTRAP_SERVERS;
+
+/** Base class for iceberg union read test. */
+class FlinkUnionReadTestBase extends FlinkIcebergTieringTestBase {
+ protected static final String DEFAULT_DB = "fluss";
+
+ protected static final String CATALOG_NAME = "test_iceberg_lake";
+ protected static final int DEFAULT_BUCKET_NUM = 1;
+ StreamTableEnvironment batchTEnv;
+
+ @BeforeAll
+ protected static void beforeAll() {
+ FlinkIcebergTieringTestBase.beforeAll();
+ }
+
+ @BeforeEach
+ public void beforeEach() {
+ super.beforeEach();
+ String bootstrapServers = String.join(",",
clientConf.get(ConfigOptions.BOOTSTRAP_SERVERS));
+ // create table environment
+ batchTEnv = StreamTableEnvironment.create(execEnv,
EnvironmentSettings.inBatchMode());
+ // crate catalog using sql
+ batchTEnv.executeSql(
+ String.format(
+ "create catalog %s with ('type' = 'fluss', '%s' =
'%s')",
+ CATALOG_NAME, BOOTSTRAP_SERVERS.key(),
bootstrapServers));
+ batchTEnv.executeSql("use catalog " + CATALOG_NAME);
+ batchTEnv.executeSql("use " + DEFAULT_DB);
+ }
+}
diff --git
a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/source/IcebergRecordAsFlussRowTest.java
b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/source/IcebergRecordAsFlussRowTest.java
index 85cbe9f68..42ac4d6fb 100644
---
a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/source/IcebergRecordAsFlussRowTest.java
+++
b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/source/IcebergRecordAsFlussRowTest.java
@@ -79,7 +79,7 @@ class IcebergRecordAsFlussRowTest {
record.setField("__offset", 100L);
record.setField("__timestamp", OffsetDateTime.now(ZoneOffset.UTC));
- icebergRecordAsFlussRow.setIcebergRecord(record);
+ icebergRecordAsFlussRow.replaceIcebergRecord(record);
// Should return count excluding system columns (3 system columns)
assertThat(icebergRecordAsFlussRow.getFieldCount()).isEqualTo(13);
@@ -91,7 +91,7 @@ class IcebergRecordAsFlussRowTest {
record.setField("name", null); // null value
record.setField("age", 30);
- icebergRecordAsFlussRow.setIcebergRecord(record);
+ icebergRecordAsFlussRow.replaceIcebergRecord(record);
assertThat(icebergRecordAsFlussRow.isNullAt(0)).isFalse(); // id
assertThat(icebergRecordAsFlussRow.isNullAt(1)).isTrue(); // name
@@ -120,7 +120,7 @@ class IcebergRecordAsFlussRowTest {
record.setField("__offset", 100L);
record.setField("__timestamp", OffsetDateTime.now(ZoneOffset.UTC));
- icebergRecordAsFlussRow.setIcebergRecord(record);
+ icebergRecordAsFlussRow.replaceIcebergRecord(record);
// Test all data type conversions
assertThat(icebergRecordAsFlussRow.getLong(0)).isEqualTo(12345L); // id
diff --git
a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/source/IcebergRecordReaderTest.java
b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/source/IcebergRecordReaderTest.java
new file mode 100644
index 000000000..bcf559348
--- /dev/null
+++
b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/source/IcebergRecordReaderTest.java
@@ -0,0 +1,239 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.lake.iceberg.source;
+
+import org.apache.fluss.lake.source.LakeSource;
+import org.apache.fluss.lake.source.RecordReader;
+import org.apache.fluss.metadata.TablePath;
+import org.apache.fluss.record.LogRecord;
+import org.apache.fluss.row.InternalRow;
+import org.apache.fluss.types.BigIntType;
+import org.apache.fluss.types.BinaryType;
+import org.apache.fluss.types.BooleanType;
+import org.apache.fluss.types.CharType;
+import org.apache.fluss.types.DecimalType;
+import org.apache.fluss.types.DoubleType;
+import org.apache.fluss.types.FloatType;
+import org.apache.fluss.types.IntType;
+import org.apache.fluss.types.LocalZonedTimestampType;
+import org.apache.fluss.types.RowType;
+import org.apache.fluss.types.SmallIntType;
+import org.apache.fluss.types.StringType;
+import org.apache.fluss.types.TimestampType;
+import org.apache.fluss.types.TinyIntType;
+import org.apache.fluss.utils.CloseableIterator;
+
+import org.apache.flink.types.Row;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableScan;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.data.IcebergGenericReader;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.types.Types;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+import javax.annotation.Nullable;
+
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.time.LocalDateTime;
+import java.time.OffsetDateTime;
+import java.time.ZoneOffset;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.iceberg.types.Types.NestedField.optional;
+import static org.apache.iceberg.types.Types.NestedField.required;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test class for {@link IcebergRecordReader}. */
+class IcebergRecordReaderTest extends IcebergSourceTestBase {
+
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ void testReadLogTable(boolean isPartitioned) throws Exception {
+ // prepare iceberg table
+ TablePath tablePath =
+ TablePath.of(
+ DEFAULT_DB, isPartitioned ? DEFAULT_TABLE +
"_partitioned" : DEFAULT_TABLE);
+ createFullTypeLogTable(tablePath, isPartitioned, DEFAULT_BUCKET_NUM);
+
+ InternalRow.FieldGetter[] fieldGetters =
+ InternalRow.createFieldGetters(
+ RowType.of(
+ new BigIntType(),
+ new StringType(),
+ new IntType(),
+ new DoubleType(),
+ new BooleanType(),
+ new TinyIntType(),
+ new SmallIntType(),
+ new FloatType(),
+ new DecimalType(10, 2),
+ new TimestampType(),
+ new LocalZonedTimestampType(),
+ new BinaryType(),
+ new CharType(),
+ new StringType()));
+
+ // write data
+ Table table = getTable(tablePath);
+ List<InternalRow> writtenRows = new ArrayList<>();
+ writtenRows.addAll(writeFullTypeRows(table, 9, isPartitioned ? "p1" :
null));
+ writtenRows.addAll(writeFullTypeRows(table, 20, isPartitioned ? "p2" :
null));
+
+ // refresh table
+ table.refresh();
+ Snapshot snapshot = table.currentSnapshot();
+
+ List<Row> actual = new ArrayList<>();
+
+ LakeSource<IcebergSplit> lakeSource =
lakeStorage.createLakeSource(tablePath);
+ for (IcebergSplit icebergSplit :
lakeSource.createPlanner(snapshot::snapshotId).plan()) {
+ RecordReader recordReader = lakeSource.createRecordReader(() ->
icebergSplit);
+ CloseableIterator<LogRecord> iterator = recordReader.read();
+ actual.addAll(
+ convertToFlinkRow(
+ fieldGetters,
+ TransformingCloseableIterator.transform(iterator,
LogRecord::getRow)));
+ iterator.close();
+ }
+ List<Row> expect =
+ convertToFlinkRow(fieldGetters,
CloseableIterator.wrap(writtenRows.iterator()));
+ assertThat(actual).containsExactlyInAnyOrderElementsOf(expect);
+
+ // test project
+ InternalRow.FieldGetter[] projectFieldGetters =
+ InternalRow.createFieldGetters(
+ RowType.of(new TinyIntType(), new StringType(), new
DoubleType()));
+ lakeSource.withProject(new int[][] {new int[] {5}, new int[] {1}, new
int[] {3}});
+
+ List<Row> projectActual = new ArrayList<>();
+ for (IcebergSplit icebergSplit :
lakeSource.createPlanner(snapshot::snapshotId).plan()) {
+ RecordReader recordReader = lakeSource.createRecordReader(() ->
icebergSplit);
+ CloseableIterator<LogRecord> iterator = recordReader.read();
+ projectActual.addAll(
+ convertToFlinkRow(
+ projectFieldGetters,
+ TransformingCloseableIterator.transform(iterator,
LogRecord::getRow)));
+ iterator.close();
+ }
+
+ TableScan tableScan =
+ table.newScan()
+ .project(
+ new Schema(
+ optional(6, "tiny_int",
Types.IntegerType.get()),
+ optional(2, "name",
Types.StringType.get()),
+ optional(4, "salary",
Types.DoubleType.get())));
+ IcebergGenericReader reader = new IcebergGenericReader(tableScan,
true);
+ List<Row> projectExpect = new ArrayList<>();
+ try (CloseableIterable<FileScanTask> fileScanTasks =
tableScan.planFiles()) {
+ for (FileScanTask task : fileScanTasks) {
+ org.apache.iceberg.io.CloseableIterator<Record> iterator =
+ reader.open(task).iterator();
+ IcebergRecordAsFlussRow recordAsFlussRow = new
IcebergRecordAsFlussRow();
+ projectExpect.addAll(
+ convertToFlinkRow(
+ projectFieldGetters,
+ TransformingCloseableIterator.transform(
+ CloseableIterator.wrap(iterator),
+
recordAsFlussRow::replaceIcebergRecord)));
+ iterator.close();
+ }
+ }
+
assertThat(projectActual).containsExactlyInAnyOrderElementsOf(projectExpect);
+ }
+
+ private void createFullTypeLogTable(TablePath tablePath, boolean
isPartitioned, int bucketNum)
+ throws Exception {
+ // Create a schema with various data types
+ Schema schema =
+ new Schema(
+ required(1, "id", Types.LongType.get()),
+ optional(2, "name", Types.StringType.get()),
+ optional(3, "age", Types.IntegerType.get()),
+ optional(4, "salary", Types.DoubleType.get()),
+ optional(5, "is_active", Types.BooleanType.get()),
+ optional(6, "tiny_int", Types.IntegerType.get()),
+ optional(7, "small_int", Types.IntegerType.get()),
+ optional(8, "float_val", Types.FloatType.get()),
+ optional(9, "decimal_val", Types.DecimalType.of(10,
2)),
+ optional(10, "timestamp_ntz",
Types.TimestampType.withoutZone()),
+ optional(11, "timestamp_ltz",
Types.TimestampType.withZone()),
+ optional(12, "binary_data", Types.BinaryType.get()),
+ optional(13, "char_data", Types.StringType.get()),
+ optional(14, "dt", Types.StringType.get()),
+
+ // System columns
+ required(15, "__bucket", Types.IntegerType.get()),
+ required(16, "__offset", Types.LongType.get()),
+ required(17, "__timestamp",
Types.TimestampType.withZone()));
+ PartitionSpec partitionSpec =
+ isPartitioned
+ ? PartitionSpec.builderFor(schema)
+ .identity("dt")
+ .bucket("id", bucketNum)
+ .build()
+ : PartitionSpec.builderFor(schema).bucket("id",
bucketNum).build();
+ createTable(tablePath, schema, partitionSpec);
+ }
+
+ private List<InternalRow> writeFullTypeRows(
+ Table table, int rowCount, @Nullable String partition) throws
Exception {
+ List<Record> records = new ArrayList<>();
+ List<InternalRow> flussRows = new ArrayList<>();
+ for (int i = 0; i < rowCount; i++) {
+ GenericRecord record = GenericRecord.create(table.schema());
+
+ record.setField("id", (long) i);
+ record.setField("name", "name_" + i);
+ record.setField("age", 20 + (i % 30));
+ record.setField("salary", 50000.0 + (i * 1000));
+ record.setField("is_active", i % 2 == 0);
+ record.setField("tiny_int", i % 128);
+ record.setField("small_int", i % 32768);
+ record.setField("float_val", 100.5f + i);
+ record.setField("decimal_val", new BigDecimal(i + 100.25));
+ record.setField("timestamp_ntz", LocalDateTime.now());
+ record.setField("timestamp_ltz",
OffsetDateTime.now(ZoneOffset.UTC));
+ record.setField("binary_data", ByteBuffer.wrap("Hello
World!".getBytes()));
+ record.setField("char_data", "char_" + i);
+ record.setField("dt", partition);
+
+ record.setField("__bucket", 0);
+ record.setField("__offset", (long) i);
+ record.setField("__timestamp", OffsetDateTime.now(ZoneOffset.UTC));
+
+ records.add(record);
+
+ IcebergRecordAsFlussRow row = new IcebergRecordAsFlussRow();
+ row.replaceIcebergRecord(record);
+ flussRows.add(row);
+ }
+ writeRecord(table, records, partition, 0);
+ return flussRows;
+ }
+}
diff --git
a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/source/IcebergSourceTestBase.java
b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/source/IcebergSourceTestBase.java
new file mode 100644
index 000000000..b4d770062
--- /dev/null
+++
b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/source/IcebergSourceTestBase.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.lake.iceberg.source;
+
+import org.apache.fluss.config.Configuration;
+import org.apache.fluss.lake.iceberg.IcebergLakeStorage;
+import org.apache.fluss.lake.iceberg.tiering.writer.TaskWriterFactory;
+import org.apache.fluss.lake.iceberg.utils.IcebergCatalogUtils;
+import org.apache.fluss.metadata.TablePath;
+import org.apache.fluss.row.InternalRow;
+import org.apache.fluss.utils.CloseableIterator;
+
+import org.apache.flink.types.Row;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.SupportsNamespaces;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.io.TaskWriter;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.io.TempDir;
+
+import javax.annotation.Nullable;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.function.Function;
+
+import static org.apache.fluss.utils.Preconditions.checkState;
+
+/** Base class for Iceberg source tests. */
+class IcebergSourceTestBase {
+ protected static final String DEFAULT_DB = "fluss_lakehouse";
+ protected static final String DEFAULT_TABLE = "test_lakehouse_table";
+ protected static final int DEFAULT_BUCKET_NUM = 1;
+
+ private static @TempDir File tempWarehouseDir;
+ protected static IcebergLakeStorage lakeStorage;
+ protected static Catalog icebergCatalog;
+
+ @BeforeAll
+ protected static void beforeAll() {
+ Configuration configuration = new Configuration();
+ configuration.setString("warehouse", "file://" +
tempWarehouseDir.toString());
+ configuration.setString("type", "hadoop");
+ configuration.setString("name", "fluss_test_catalog");
+ lakeStorage = new IcebergLakeStorage(configuration);
+ icebergCatalog =
IcebergCatalogUtils.createIcebergCatalog(configuration);
+ }
+
+ public void createTable(TablePath tablePath, Schema schema, PartitionSpec
partitionSpec)
+ throws Exception {
+ if (!((SupportsNamespaces) icebergCatalog)
+ .namespaceExists(Namespace.of(tablePath.getDatabaseName()))) {
+ ((SupportsNamespaces) icebergCatalog)
+
.createNamespace(Namespace.of(tablePath.getDatabaseName()));
+ }
+ icebergCatalog.createTable(
+ TableIdentifier.of(tablePath.getDatabaseName(),
tablePath.getTableName()),
+ schema,
+ partitionSpec);
+ }
+
+ public void writeRecord(
+ Table table, List<Record> records, @Nullable String partition, int
bucket)
+ throws Exception {
+ try (TaskWriter<Record> taskWriter =
+ TaskWriterFactory.createTaskWriter(table, partition, bucket)) {
+ for (Record r : records) {
+ taskWriter.write(r);
+ }
+ DataFile[] dataFiles = taskWriter.dataFiles();
+ checkState(dataFiles.length == 1);
+ table.newAppend().appendFile(dataFiles[0]).commit();
+ }
+ }
+
+ public static List<Row> convertToFlinkRow(
+ org.apache.fluss.row.InternalRow.FieldGetter[] fieldGetters,
+ CloseableIterator<InternalRow> flussRowIterator) {
+ List<Row> rows = new ArrayList<>();
+ while (flussRowIterator.hasNext()) {
+ org.apache.fluss.row.InternalRow row = flussRowIterator.next();
+ Row flinkRow = new Row(fieldGetters.length);
+ for (int i = 0; i < fieldGetters.length; i++) {
+ flinkRow.setField(i, fieldGetters[i].getFieldOrNull(row));
+ }
+ rows.add(flinkRow);
+ }
+ return rows;
+ }
+
+ public Table getTable(TablePath tablePath) throws Exception {
+ return icebergCatalog.loadTable(
+ TableIdentifier.of(tablePath.getDatabaseName(),
tablePath.getTableName()));
+ }
+
+ public GenericRecord createIcebergRecord(Schema schema, Object... values) {
+ GenericRecord record = GenericRecord.create(schema);
+ for (int i = 0; i < values.length; i++) {
+ record.set(i, values[i]);
+ }
+ return record;
+ }
+
+ /** Adapter for transforming closeable iterator. */
+ public static class TransformingCloseableIterator<T, U> implements
CloseableIterator<U> {
+ private final CloseableIterator<T> source;
+ private final Function<? super T, ? extends U> transformer;
+
+ public TransformingCloseableIterator(
+ CloseableIterator<T> source, Function<? super T, ? extends U>
transformer) {
+ this.source = source;
+ this.transformer = transformer;
+ }
+
+ @Override
+ public boolean hasNext() {
+ return source.hasNext();
+ }
+
+ @Override
+ public U next() {
+ return transformer.apply(source.next());
+ }
+
+ @Override
+ public void close() {
+ source.close();
+ }
+
+ public static <T, U> CloseableIterator<U> transform(
+ CloseableIterator<T> source, Function<? super T, ? extends U>
transformer) {
+ return new TransformingCloseableIterator<>(source, transformer);
+ }
+ }
+}
diff --git
a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/source/IcebergSplitPlannerTest.java
b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/source/IcebergSplitPlannerTest.java
new file mode 100644
index 000000000..2697339e6
--- /dev/null
+++
b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/source/IcebergSplitPlannerTest.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.lake.iceberg.source;
+
+import org.apache.fluss.lake.source.LakeSource;
+import org.apache.fluss.metadata.TablePath;
+
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.types.Types;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+import java.time.OffsetDateTime;
+import java.time.ZoneOffset;
+import java.util.Collections;
+import java.util.List;
+
+import static org.apache.fluss.metadata.TableDescriptor.BUCKET_COLUMN_NAME;
+import static org.apache.fluss.metadata.TableDescriptor.OFFSET_COLUMN_NAME;
+import static org.apache.fluss.metadata.TableDescriptor.TIMESTAMP_COLUMN_NAME;
+import static org.apache.iceberg.types.Types.NestedField.optional;
+import static org.apache.iceberg.types.Types.NestedField.required;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test class for {@link IcebergSplitPlanner}. */
+class IcebergSplitPlannerTest extends IcebergSourceTestBase {
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ void testLogTablePlan(boolean isPartitioned) throws Exception {
+ // prepare iceberg log table
+ TablePath tablePath =
+ TablePath.of(
+ DEFAULT_DB, "log_" + (isPartitioned ? "partitioned_" :
"") + DEFAULT_TABLE);
+ Schema schema =
+ new Schema(
+ optional(1, "c1", Types.IntegerType.get()),
+ optional(2, "c2", Types.StringType.get()),
+ optional(3, "c3", Types.StringType.get()));
+ PartitionSpec partitionSpec =
+ isPartitioned
+ ?
PartitionSpec.builderFor(schema).identity("c2").bucket("c1", 2).build()
+ : PartitionSpec.builderFor(schema).bucket("c1",
2).build();
+ createTable(tablePath, schema, partitionSpec);
+
+ // write data
+ Table table = getTable(tablePath);
+ GenericRecord record1 = createIcebergRecord(schema, 12, "a", "A");
+ GenericRecord record2 = createIcebergRecord(schema, 13, "b", "B");
+
+ writeRecord(table, Collections.singletonList(record1), isPartitioned ?
"a" : null, 0);
+ writeRecord(table, Collections.singletonList(record2), isPartitioned ?
"b" : null, 1);
+
+ // refresh table
+ table.refresh();
+ Snapshot snapshot = table.currentSnapshot();
+
+ LakeSource<IcebergSplit> lakeSource =
lakeStorage.createLakeSource(tablePath);
+ List<IcebergSplit> icebergSplits =
lakeSource.createPlanner(snapshot::snapshotId).plan();
+ assertThat(icebergSplits.size()).isEqualTo(2);
+ // Log table with bucket-aware
+ assertThat(icebergSplits.stream().map(IcebergSplit::bucket))
+ .containsExactlyInAnyOrder(0, 1);
+ if (isPartitioned) {
+ assertThat(icebergSplits.stream().map(IcebergSplit::partition))
+ .containsExactlyInAnyOrder(
+ Collections.singletonList("a"),
Collections.singletonList("b"));
+ }
+ }
+
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ void testBucketUnawareLogTablePlan(boolean isPartitioned) throws Exception
{
+ // prepare iceberg table which corresponding to a Fluss bucket unaware
table log table
+ TablePath tablePath =
+ TablePath.of(
+ DEFAULT_DB,
+ "log_"
+ + (isPartitioned ? "partitioned_" : "")
+ + DEFAULT_TABLE
+ + "_fluss_bucket");
+ Schema schema =
+ new Schema(
+ optional(1, "c1", Types.IntegerType.get()),
+ optional(2, "c2", Types.StringType.get()),
+ optional(3, "c3", Types.StringType.get()),
+ // System columns
+ required(14, BUCKET_COLUMN_NAME,
Types.IntegerType.get()),
+ required(15, OFFSET_COLUMN_NAME, Types.LongType.get()),
+ required(16, TIMESTAMP_COLUMN_NAME,
Types.TimestampType.withZone()));
+ PartitionSpec partitionSpec =
+ isPartitioned
+ ? PartitionSpec.builderFor(schema)
+ .identity("c2")
+ .identity(BUCKET_COLUMN_NAME)
+ .build()
+ :
PartitionSpec.builderFor(schema).identity(BUCKET_COLUMN_NAME).build();
+ createTable(tablePath, schema, partitionSpec);
+
+ // write data
+ Table table = getTable(tablePath);
+ GenericRecord record1 =
+ createIcebergRecord(
+ schema, 12, "a", "A", 0, 100L,
OffsetDateTime.now(ZoneOffset.UTC));
+ GenericRecord record2 =
+ createIcebergRecord(
+ schema, 13, "b", "B", 1, 200L,
OffsetDateTime.now(ZoneOffset.UTC));
+
+ writeRecord(table, Collections.singletonList(record1), isPartitioned ?
"a" : null, 0);
+ writeRecord(table, Collections.singletonList(record2), isPartitioned ?
"b" : null, 1);
+
+ // refresh table
+ table.refresh();
+ Snapshot snapshot = table.currentSnapshot();
+
+ LakeSource<IcebergSplit> lakeSource =
lakeStorage.createLakeSource(tablePath);
+ List<IcebergSplit> icebergSplits =
lakeSource.createPlanner(snapshot::snapshotId).plan();
+ assertThat(icebergSplits.size()).isEqualTo(2);
+ // Log table is not bucket-aware
+ assertThat(icebergSplits.get(0).bucket()).isEqualTo(-1);
+ assertThat(icebergSplits.get(1).bucket()).isEqualTo(-1);
+ if (isPartitioned) {
+ assertThat(icebergSplits.stream().map(IcebergSplit::partition))
+ .containsExactlyInAnyOrder(
+ Collections.singletonList("a"),
Collections.singletonList("b"));
+ }
+ }
+}
diff --git
a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/source/IcebergSplitSerializerTest.java
b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/source/IcebergSplitSerializerTest.java
new file mode 100644
index 000000000..a8df50db0
--- /dev/null
+++
b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/source/IcebergSplitSerializerTest.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ *
+ */
+
+package org.apache.fluss.lake.iceberg.source;
+
+import org.apache.fluss.lake.source.LakeSource;
+import org.apache.fluss.metadata.TablePath;
+
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.types.Types;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.apache.iceberg.types.Types.NestedField.optional;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Test case for {@link IcebergSplitSerializer}. */
+class IcebergSplitSerializerTest extends IcebergSourceTestBase {
+ private final IcebergSplitSerializer serializer = new
IcebergSplitSerializer();
+
+ @Test
+ void testSerializeAndDeserialize() throws Exception {
+ // prepare iceberg table
+ TablePath tablePath = TablePath.of(DEFAULT_DB, DEFAULT_TABLE);
+ Schema schema =
+ new Schema(
+ optional(1, "c1", Types.IntegerType.get()),
+ optional(2, "c2", Types.StringType.get()),
+ optional(3, "c3", Types.StringType.get()));
+ PartitionSpec partitionSpec =
+ PartitionSpec.builderFor(schema).bucket("c1",
DEFAULT_BUCKET_NUM).build();
+ createTable(tablePath, schema, partitionSpec);
+
+ // write data
+ Table table = getTable(tablePath);
+
+ GenericRecord record1 = createIcebergRecord(schema, 12, "a", "A");
+ GenericRecord record2 = createIcebergRecord(schema, 13, "b", "B");
+
+ writeRecord(table, Arrays.asList(record1, record2), null, 0);
+ table.refresh();
+ Snapshot snapshot = table.currentSnapshot();
+
+ LakeSource<IcebergSplit> lakeSource =
lakeStorage.createLakeSource(tablePath);
+ List<IcebergSplit> plan =
lakeSource.createPlanner(snapshot::snapshotId).plan();
+
+ IcebergSplit originalIcebergSplit = plan.get(0);
+ byte[] serialized = serializer.serialize(originalIcebergSplit);
+ IcebergSplit deserialized =
serializer.deserialize(serializer.getVersion(), serialized);
+
+ assertThat(deserialized.fileScanTask().file().location())
+
.isEqualTo(originalIcebergSplit.fileScanTask().file().location());
+
assertThat(deserialized.partition()).isEqualTo(originalIcebergSplit.partition());
+
assertThat(deserialized.bucket()).isEqualTo(originalIcebergSplit.bucket());
+ }
+
+ @Test
+ void testDeserializeWithInvalidData() {
+ byte[] invalidData = "invalid".getBytes();
+ assertThatThrownBy(() -> serializer.deserialize(1, invalidData))
+ .isInstanceOf(IOException.class);
+ }
+}
diff --git
a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/testutils/FlinkIcebergTieringTestBase.java
b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/testutils/FlinkIcebergTieringTestBase.java
index 2a216ddea..81bfb0bd0 100644
---
a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/testutils/FlinkIcebergTieringTestBase.java
+++
b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/testutils/FlinkIcebergTieringTestBase.java
@@ -80,6 +80,7 @@ import static
org.apache.fluss.lake.iceberg.utils.IcebergConversions.toIceberg;
import static org.apache.fluss.metadata.TableDescriptor.OFFSET_COLUMN_NAME;
import static org.apache.fluss.testutils.DataTestUtils.row;
import static org.apache.fluss.testutils.common.CommonTestUtils.retry;
+import static org.apache.fluss.testutils.common.CommonTestUtils.waitUntil;
import static org.apache.fluss.testutils.common.CommonTestUtils.waitValue;
import static org.apache.iceberg.expressions.Expressions.equal;
import static org.assertj.core.api.Assertions.assertThat;
@@ -461,4 +462,32 @@ public class FlinkIcebergTieringTestBase {
writeRows(tablePath, rows, !tableDescriptor.hasPrimaryKey());
return writtenRowsByPartition;
}
+
+ protected void waitUntilBucketSynced(
+ TablePath tablePath, long tableId, int bucketCount, boolean
isPartition) {
+ if (isPartition) {
+ Map<Long, String> partitionById = waitUntilPartitions(tablePath);
+ for (Long partitionId : partitionById.keySet()) {
+ for (int i = 0; i < bucketCount; i++) {
+ TableBucket tableBucket = new TableBucket(tableId,
partitionId, i);
+ waitUntilBucketSynced(tableBucket);
+ }
+ }
+ } else {
+ for (int i = 0; i < bucketCount; i++) {
+ TableBucket tableBucket = new TableBucket(tableId, i);
+ waitUntilBucketSynced(tableBucket);
+ }
+ }
+ }
+
+ protected void waitUntilBucketSynced(TableBucket tb) {
+ waitUntil(
+ () -> {
+ Replica replica = getLeaderReplica(tb);
+ return replica.getLogTablet().getLakeTableSnapshotId() >=
0;
+ },
+ Duration.ofMinutes(2),
+ "bucket " + tb + " not synced");
+ }
}
diff --git
a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/utils/IcebergConversionsTest.java
b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/utils/IcebergConversionsTest.java
index 9d18e9d5c..084876475 100644
---
a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/utils/IcebergConversionsTest.java
+++
b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/utils/IcebergConversionsTest.java
@@ -19,7 +19,6 @@
package org.apache.fluss.lake.iceberg.utils;
import org.apache.fluss.config.Configuration;
-import org.apache.fluss.lake.iceberg.conf.IcebergConfiguration;
import org.apache.fluss.metadata.TablePath;
import org.apache.iceberg.PartitionKey;
@@ -35,10 +34,8 @@ import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import java.io.File;
-import java.util.Map;
import static org.apache.fluss.lake.iceberg.utils.IcebergConversions.toIceberg;
-import static org.apache.iceberg.CatalogUtil.buildIcebergCatalog;
import static org.apache.iceberg.types.Types.NestedField.optional;
import static org.apache.iceberg.types.Types.NestedField.required;
import static org.assertj.core.api.Assertions.assertThat;
@@ -69,11 +66,7 @@ class IcebergConversionsTest {
configuration.setString("warehouse",
tempWarehouseDir.toURI().toString());
configuration.setString("catalog-impl",
"org.apache.iceberg.inmemory.InMemoryCatalog");
configuration.setString("name", "fluss_test_catalog");
-
- Map<String, String> icebergProps = configuration.toMap();
- String catalogName = icebergProps.getOrDefault("name",
"default_iceberg_catalog");
- return buildIcebergCatalog(
- catalogName, icebergProps,
IcebergConfiguration.from(configuration).get());
+ return IcebergCatalogUtils.createIcebergCatalog(configuration);
}
private Table createIcebergTable(
diff --git
a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/source/PaimonLakeSource.java
b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/source/PaimonLakeSource.java
index 67e760f57..979356fa2 100644
---
a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/source/PaimonLakeSource.java
+++
b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/source/PaimonLakeSource.java
@@ -49,6 +49,7 @@ import static
org.apache.fluss.lake.paimon.utils.PaimonConversions.toPaimon;
* paimon table.
*/
public class PaimonLakeSource implements LakeSource<PaimonSplit> {
+ private static final long serialVersionUID = 1L;
private final Configuration paimonConfig;
private final TablePath tablePath;
diff --git
a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/flink/FlinkUnionReadLogTableITCase.java
b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/flink/FlinkUnionReadLogTableITCase.java
index 987bc0451..3cd682ab2 100644
---
a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/flink/FlinkUnionReadLogTableITCase.java
+++
b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/flink/FlinkUnionReadLogTableITCase.java
@@ -85,7 +85,7 @@ class FlinkUnionReadLogTableITCase extends
FlinkUnionReadTestBase {
assertThat(actual).containsExactlyInAnyOrderElementsOf(writtenRows);
- // can database sync job
+ // cancel the tiering job
jobClient.cancel().get();
// write some log data again
@@ -155,7 +155,7 @@ class FlinkUnionReadLogTableITCase extends
FlinkUnionReadTestBase {
assertResultsIgnoreOrder(
actual,
writtenRows.stream().map(Row::toString).collect(Collectors.toList()), true);
- // can database sync job
+ // cancel the tiering job
jobClient.cancel().get();
// write some log data again
diff --git a/fluss-test-coverage/pom.xml b/fluss-test-coverage/pom.xml
index f2baea535..df13bb696 100644
--- a/fluss-test-coverage/pom.xml
+++ b/fluss-test-coverage/pom.xml
@@ -388,6 +388,7 @@
<exclude>org.apache.fluss.lake.iceberg.*</exclude>
<exclude>org.apache.fluss.row.encode.iceberg.*</exclude>
<exclude>org.apache.fluss.bucketing.IcebergBucketingFunction</exclude>
+
<exclude>org.apache.iceberg.transforms.TransformUtils</exclude>
<!-- start exclude for flink tiering
service -->
<exclude>org.apache.fluss.flink.tiering.source.TieringSourceOptions</exclude>
<exclude>org.apache.fluss.flink.tiering.source.TieringSource.Builder</exclude>