This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 01a85d18e6 [iceberg] support iceberg rest catalog for iceberg
compatibility (#5817)
01a85d18e6 is described below
commit 01a85d18e659da246a6eb2de7bce4db9c952c508
Author: LsomeYeah <[email protected]>
AuthorDate: Mon Jul 14 13:50:15 2025 +0800
[iceberg] support iceberg rest catalog for iceberg compatibility (#5817)
---
docs/content/iceberg/rest-catalog.md | 100 ++++
.../generated/iceberg_configuration.html | 4 +-
.../paimon/iceberg/IcebergCommitCallback.java | 45 +-
.../paimon/iceberg/IcebergMetadataCommitter.java | 6 +
.../org/apache/paimon/iceberg/IcebergOptions.java | 51 +-
.../paimon/iceberg/metadata/IcebergSchema.java | 5 +
.../iceberg/IcebergHiveMetadataCommitter.java | 12 +
paimon-iceberg/pom.xml | 133 ++++-
.../org/apache/iceberg/IcebergSnapshotRefType.java | 20 +-
.../IcebergRESTMetadataCommitterFactory.java | 20 +-
.../iceberg/IcebergRestMetadataCommitter.java | 430 ++++++++++++++
.../services/org.apache.paimon.factories.Factory | 16 +
.../flink/IcebergRestMetadataCommitterITCase.java | 128 ++++
.../iceberg/IcebergRestMetadataCommitterTest.java | 641 +++++++++++++++++++++
14 files changed, 1557 insertions(+), 54 deletions(-)
diff --git a/docs/content/iceberg/rest-catalog.md
b/docs/content/iceberg/rest-catalog.md
new file mode 100644
index 0000000000..177292f9e8
--- /dev/null
+++ b/docs/content/iceberg/rest-catalog.md
@@ -0,0 +1,100 @@
+---
+title: "Rest Catalog"
+weight: 5
+type: docs
+aliases:
+- /iceberg/rest-catalog.html
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# Rest Catalog
+
+When creating Paimon table, set `'metadata.iceberg.storage' = 'rest-catalog'`.
+This option value will not only store Iceberg metadata like hadoop-catalog,
but also create table in [iceberg rest
catalog](https://iceberg.apache.org/terms/#decoupling-using-the-rest-catalog).
+This Paimon table can be accessed from Iceberg Rest catalog later.
+
+You need to provide information about Rest Catalog by setting options prefixed
with `'metadata.iceberg.rest.'`, such as
+`'metadata.iceberg.rest.uri' = 'https://localhost/'`. Paimon will try to use
these options to initialize a iceberg rest catalog,
+and use this rest catalog to commit metadata.
+
+**Dependency:**
+
+This feature needs dependency: [paimon-iceberg-{{< version
>}}.jar](https://repository.apache.org/snapshots/org/apache/paimon/paimon-iceberg/{{<
version >}}/),
+and JDK version should be 11+.
+
+You can also manually build the jar from the source code.(need JDK 11+)
+
+To build from source code, [clone the git repository]({{< github_repo >}}).
+
+Build bundled jar with the following command.
+- `mvn clean install -DskipTests`
+
+You can find the jar in `./paimon-iceberg/target/paimon-iceberg-{{< version
>}}.jar`.
+
+**Example:**
+
+Here is an example using flink sql:
+```sql
+-- create a paimon table
+CREATE TABLE `paimon`.`default`.`T` (
+ pt INT,
+ k INT,
+ v INT,
+ PRIMARY KEY (pt, k) NOT ENFORCED
+) PARTITIONED BY (pt) WITH (
+ 'metadata.iceberg.storage' = 'rest-catalog',
+ 'metadata.iceberg.rest.uri' = 'http://localhost:55807/',
+ 'metadata.iceberg.rest.warehouse' = 'rck_warehouse',
+ 'metadata.iceberg.rest.clients' = '1'
+);
+
+-- insert some data
+INSERT INTO `paimon`.`default`.`T` VALUES(1, 9, 90),(1, 10, 100),(1, 11,
110),(2, 20, 200);
+
+-- create a iceberg rest catalog
+CREATE CATALOG `iceberg` WITH (
+ 'type' = 'iceberg',
+ 'catalog-type' = 'rest',
+ 'uri' = 'http://localhost:55807/',
+ 'clients' = '1',
+ 'cache-enabled' = 'false'
+)
+
+-- verify the data in iceberg rest-catalog
+SELECT v, k, pt FROM `iceberg`.`default`.`T` ORDER BY pt, k;
+/*
+the query results:
+ 90, 9, 1
+100, 10, 1
+110, 11, 1
+200, 20, 2
+*/
+```
+**Note:**
+
+Paimon will firstly write iceberg metadata in a separate directory like
hadoop-catalog, and then commit metadata to iceberg rest catalog.
+If the two are incompatible, we take the metadata stored in the separate
directory as the reference.
+
+There are some cases when committing to iceberg rest catalog:
+1. table not exists in iceberg rest-catalog. It'll create the table in rest
catalog first, and commit metadata.
+2. table exists in iceberg rest-catalog and is compatible with the base
metadata stored in the separate directory. It'll directly get the table and
commit metadata.
+3. table exists, and last-sequence-number is 0 and current-snapshot-id is -1.
It'll treat the table as a new table, directly get the table and commit
metadata.
+4. table exists, and isn't compatible with the base metadata stored in the
separate directory. It'll **drop the table and recreate the table**, then
commit metadata.
+
diff --git a/docs/layouts/shortcodes/generated/iceberg_configuration.html
b/docs/layouts/shortcodes/generated/iceberg_configuration.html
index 30c05ef9b5..cf57763779 100644
--- a/docs/layouts/shortcodes/generated/iceberg_configuration.html
+++ b/docs/layouts/shortcodes/generated/iceberg_configuration.html
@@ -102,13 +102,13 @@ under the License.
<td><h5>metadata.iceberg.previous-versions-max</h5></td>
<td style="word-wrap: break-word;">0</td>
<td>Integer</td>
- <td>The number of old metadata files to keep after each table
commit</td>
+ <td>The number of old metadata files to keep after each table
commit. For rest-catalog, it will keep 1 old metadata at least.</td>
</tr>
<tr>
<td><h5>metadata.iceberg.storage</h5></td>
<td style="word-wrap: break-word;">disabled</td>
<td><p>Enum</p></td>
- <td>When set, produce Iceberg metadata after a snapshot is
committed, so that Iceberg readers can read Paimon's raw data files.<br /><br
/>Possible values:<ul><li>"disabled": Disable Iceberg compatibility
support.</li><li>"table-location": Store Iceberg metadata in each table's
directory.</li><li>"hadoop-catalog": Store Iceberg metadata in a separate
directory. This directory can be specified as the warehouse directory of an
Iceberg Hadoop catalog.</li><li>"hive-catalog": Not [...]
+ <td>When set, produce Iceberg metadata after a snapshot is
committed, so that Iceberg readers can read Paimon's raw data files.<br /><br
/>Possible values:<ul><li>"disabled": Disable Iceberg compatibility
support.</li><li>"table-location": Store Iceberg metadata in each table's
directory.</li><li>"hadoop-catalog": Store Iceberg metadata in a separate
directory. This directory can be specified as the warehouse directory of an
Iceberg Hadoop catalog.</li><li>"hive-catalog": Not [...]
</tr>
<tr>
<td><h5>metadata.iceberg.storage-location</h5></td>
diff --git
a/paimon-core/src/main/java/org/apache/paimon/iceberg/IcebergCommitCallback.java
b/paimon-core/src/main/java/org/apache/paimon/iceberg/IcebergCommitCallback.java
index a53f132ead..ccff0fd162 100644
---
a/paimon-core/src/main/java/org/apache/paimon/iceberg/IcebergCommitCallback.java
+++
b/paimon-core/src/main/java/org/apache/paimon/iceberg/IcebergCommitCallback.java
@@ -90,6 +90,7 @@ import java.util.Set;
import java.util.UUID;
import java.util.function.Function;
import java.util.stream.Collectors;
+import java.util.stream.IntStream;
import java.util.stream.Stream;
import static
org.apache.paimon.deletionvectors.DeletionVectorsIndexFile.DELETION_VECTORS_INDEX;
@@ -208,6 +209,7 @@ public class IcebergCommitCallback implements
CommitCallback, TagCallback {
return IcebergOptions.StorageLocation.TABLE_LOCATION;
case HIVE_CATALOG:
case HADOOP_CATALOG:
+ case REST_CATALOG:
return IcebergOptions.StorageLocation.CATALOG_STORAGE;
default:
throw new UnsupportedOperationException(
@@ -349,6 +351,11 @@ public class IcebergCommitCallback implements
CommitCallback, TagCallback {
entry -> new
IcebergRef(entry.getKey().id())));
String tableUuid = UUID.randomUUID().toString();
+
+ List<IcebergSchema> allSchemas =
+ IntStream.rangeClosed(0, schemaId)
+ .mapToObj(schemaCache::get)
+ .collect(Collectors.toList());
IcebergMetadata metadata =
new IcebergMetadata(
formatVersion,
@@ -356,7 +363,7 @@ public class IcebergCommitCallback implements
CommitCallback, TagCallback {
table.location().toString(),
snapshotId,
icebergSchema.highestFieldId(),
- Collections.singletonList(icebergSchema),
+ allSchemas,
schemaId,
Collections.singletonList(new
IcebergPartitionSpec(partitionFields)),
partitionFields.stream()
@@ -379,7 +386,17 @@ public class IcebergCommitCallback implements
CommitCallback, TagCallback {
expireAllBefore(snapshotId);
if (metadataCommitter != null) {
- metadataCommitter.commitMetadata(metadataPath, null);
+ switch (metadataCommitter.identifier()) {
+ case "hive":
+ metadataCommitter.commitMetadata(metadataPath, null);
+ break;
+ case "rest":
+ metadataCommitter.commitMetadata(metadata, null);
+ break;
+ default:
+ throw new UnsupportedOperationException(
+ "Unsupported metadata committer: " +
metadataCommitter.identifier());
+ }
}
}
@@ -557,14 +574,22 @@ public class IcebergCommitCallback implements
CommitCallback, TagCallback {
newDVManifestFileMetas.stream())
.collect(Collectors.toList()));
- // add new schema if needed
+ // add new schemas if needed
SchemaCache schemaCache = new SchemaCache();
int schemaId = (int) schemaCache.getLatestSchemaId();
IcebergSchema icebergSchema = schemaCache.get(schemaId);
List<IcebergSchema> schemas = baseMetadata.schemas();
if (baseMetadata.currentSchemaId() != schemaId) {
+ Preconditions.checkArgument(
+ schemaId > baseMetadata.currentSchemaId(),
+ "currentSchemaId{%s} in paimon should be greater than
currentSchemaId{%s} in base metadata.",
+ schemaId,
+ baseMetadata.currentSchemaId());
schemas = new ArrayList<>(schemas);
- schemas.add(icebergSchema);
+ schemas.addAll(
+ IntStream.rangeClosed(baseMetadata.currentSchemaId() + 1,
schemaId)
+ .mapToObj(schemaCache::get)
+ .collect(Collectors.toList()));
}
List<IcebergSnapshot> snapshots = new
ArrayList<>(baseMetadata.snapshots());
@@ -619,7 +644,17 @@ public class IcebergCommitCallback implements
CommitCallback, TagCallback {
}
if (metadataCommitter != null) {
- metadataCommitter.commitMetadata(metadataPath, baseMetadataPath);
+ switch (metadataCommitter.identifier()) {
+ case "hive":
+ metadataCommitter.commitMetadata(metadataPath,
baseMetadataPath);
+ break;
+ case "rest":
+ metadataCommitter.commitMetadata(metadata, baseMetadata);
+ break;
+ default:
+ throw new UnsupportedOperationException(
+ "Unsupported metadata committer: " +
metadataCommitter.identifier());
+ }
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/iceberg/IcebergMetadataCommitter.java
b/paimon-core/src/main/java/org/apache/paimon/iceberg/IcebergMetadataCommitter.java
index 2fe1314983..64ecb96062 100644
---
a/paimon-core/src/main/java/org/apache/paimon/iceberg/IcebergMetadataCommitter.java
+++
b/paimon-core/src/main/java/org/apache/paimon/iceberg/IcebergMetadataCommitter.java
@@ -19,6 +19,7 @@
package org.apache.paimon.iceberg;
import org.apache.paimon.fs.Path;
+import org.apache.paimon.iceberg.metadata.IcebergMetadata;
import javax.annotation.Nullable;
@@ -28,5 +29,10 @@ import javax.annotation.Nullable;
*/
public interface IcebergMetadataCommitter {
+ String identifier();
+
void commitMetadata(Path newMetadataPath, @Nullable Path baseMetadataPath);
+
+ void commitMetadata(
+ IcebergMetadata newIcebergMetadata, @Nullable IcebergMetadata
baseIcebergMetadata);
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/iceberg/IcebergOptions.java
b/paimon-core/src/main/java/org/apache/paimon/iceberg/IcebergOptions.java
index 2fe9abba26..f371db825c 100644
--- a/paimon-core/src/main/java/org/apache/paimon/iceberg/IcebergOptions.java
+++ b/paimon-core/src/main/java/org/apache/paimon/iceberg/IcebergOptions.java
@@ -20,15 +20,22 @@ package org.apache.paimon.iceberg;
import org.apache.paimon.options.ConfigOption;
import org.apache.paimon.options.ConfigOptions;
+import org.apache.paimon.options.Options;
import org.apache.paimon.options.description.DescribedEnum;
import org.apache.paimon.options.description.InlineElement;
import org.apache.paimon.options.description.TextElement;
+import org.apache.paimon.utils.Preconditions;
+
+import java.util.HashMap;
+import java.util.Map;
import static org.apache.paimon.options.ConfigOptions.key;
/** Config options for Paimon Iceberg compatibility. */
public class IcebergOptions {
+ public static final String REST_CONFIG_PREFIX = "metadata.iceberg.rest.";
+
public static final ConfigOption<StorageType> METADATA_ICEBERG_STORAGE =
key("metadata.iceberg.storage")
.enumType(StorageType.class)
@@ -79,7 +86,8 @@ public class IcebergOptions {
.intType()
.defaultValue(0)
.withDescription(
- "The number of old metadata files to keep after
each table commit");
+ "The number of old metadata files to keep after
each table commit. "
+ + "For rest-catalog, it will keep 1 old
metadata at least.");
public static final ConfigOption<String> URI =
key("metadata.iceberg.uri")
@@ -147,6 +155,41 @@ public class IcebergOptions {
.defaultValue(false)
.withDescription("Skip updating Hive stats.");
+ private final Options options;
+
+ public IcebergOptions(Map<String, String> options) {
+ this(Options.fromMap(options));
+ }
+
+ public IcebergOptions(Options options) {
+ this.options = options;
+ }
+
+ public Map<String, String> icebergRestConfig() {
+ Map<String, String> restConfig = new HashMap<>();
+ options.keySet()
+ .forEach(
+ key -> {
+ if (key.startsWith(REST_CONFIG_PREFIX)) {
+ String restConfigKey =
key.substring(REST_CONFIG_PREFIX.length());
+ Preconditions.checkArgument(
+ !restConfigKey.isEmpty(),
+ "config key '%s' for iceberg rest
catalog is empty!",
+ key);
+ restConfig.put(restConfigKey,
options.get(key));
+ }
+ });
+ return restConfig;
+ }
+
+ public boolean deleteAfterCommitEnabled() {
+ return options.get(METADATA_DELETE_AFTER_COMMIT);
+ }
+
+ public int previousVersionsMax() {
+ return options.get(METADATA_PREVIOUS_VERSIONS_MAX);
+ }
+
/** Where to store Iceberg metadata. */
public enum StorageType implements DescribedEnum {
DISABLED("disabled", "Disable Iceberg compatibility support."),
@@ -158,7 +201,11 @@ public class IcebergOptions {
HIVE_CATALOG(
"hive-catalog",
"Not only store Iceberg metadata like hadoop-catalog, "
- + "but also create Iceberg external table in Hive.");
+ + "but also create Iceberg external table in Hive."),
+ REST_CATALOG(
+ "rest-catalog",
+ "Store Iceberg metadata in a REST catalog. "
+ + "This allows integration with Iceberg REST catalog
services.");
private final String value;
private final String description;
diff --git
a/paimon-core/src/main/java/org/apache/paimon/iceberg/metadata/IcebergSchema.java
b/paimon-core/src/main/java/org/apache/paimon/iceberg/metadata/IcebergSchema.java
index ff28a3bfd2..6c18eed7ab 100644
---
a/paimon-core/src/main/java/org/apache/paimon/iceberg/metadata/IcebergSchema.java
+++
b/paimon-core/src/main/java/org/apache/paimon/iceberg/metadata/IcebergSchema.java
@@ -19,6 +19,7 @@
package org.apache.paimon.iceberg.metadata;
import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.utils.JsonSerdeUtil;
import
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
import
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonGetter;
@@ -93,6 +94,10 @@ public class IcebergSchema {
return fields.stream().mapToInt(IcebergDataField::id).max().orElse(0);
}
+ public String toJson() {
+ return JsonSerdeUtil.toJson(this);
+ }
+
@Override
public int hashCode() {
return Objects.hash(type, schemaId, fields);
diff --git
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/iceberg/IcebergHiveMetadataCommitter.java
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/iceberg/IcebergHiveMetadataCommitter.java
index 6bfe86ad29..944cc94398 100644
---
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/iceberg/IcebergHiveMetadataCommitter.java
+++
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/iceberg/IcebergHiveMetadataCommitter.java
@@ -24,6 +24,7 @@ import org.apache.paimon.fs.Path;
import org.apache.paimon.hive.HiveCatalog;
import org.apache.paimon.hive.HiveTypeUtils;
import org.apache.paimon.hive.pool.CachedClientPool;
+import org.apache.paimon.iceberg.metadata.IcebergMetadata;
import org.apache.paimon.options.Options;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.types.DataField;
@@ -117,6 +118,11 @@ public class IcebergHiveMetadataCommitter implements
IcebergMetadataCommitter {
hiveConf, options,
options.getString(IcebergOptions.HIVE_CLIENT_CLASS));
}
+ @Override
+ public String identifier() {
+ return "hive";
+ }
+
@Override
public void commitMetadata(Path newMetadataPath, @Nullable Path
baseMetadataPath) {
try {
@@ -126,6 +132,12 @@ public class IcebergHiveMetadataCommitter implements
IcebergMetadataCommitter {
}
}
+ @Override
+ public void commitMetadata(
+ IcebergMetadata icebergMetadata, @Nullable IcebergMetadata
baseIcebergMetadata) {
+ throw new UnsupportedOperationException();
+ }
+
private void commitMetadataImpl(Path newMetadataPath, @Nullable Path
baseMetadataPath)
throws Exception {
if (!databaseExists(icebergHiveDatabase)) {
diff --git a/paimon-iceberg/pom.xml b/paimon-iceberg/pom.xml
index f9df0801d0..c3a7c3aa45 100644
--- a/paimon-iceberg/pom.xml
+++ b/paimon-iceberg/pom.xml
@@ -45,7 +45,13 @@ under the License.
<groupId>org.apache.paimon</groupId>
<artifactId>paimon-bundle</artifactId>
<version>${project.version}</version>
- <scope>test</scope>
+ <scope>provided</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>org.apache.orc</groupId>
+ <artifactId>orc-core</artifactId>
+ </exclusion>
+ </exclusions>
</dependency>
<dependency>
@@ -76,6 +82,51 @@ under the License.
<scope>test</scope>
</dependency>
+ <!-- iceberg dependency -->
+ <dependency>
+ <groupId>org.apache.iceberg</groupId>
+ <artifactId>iceberg-core</artifactId>
+ <version>${iceberg.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.iceberg</groupId>
+ <artifactId>iceberg-core</artifactId>
+ <version>${iceberg.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.iceberg</groupId>
+ <artifactId>iceberg-open-api</artifactId>
+ <version>${iceberg.version}</version>
+ <classifier>test-fixtures</classifier>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.iceberg</groupId>
+ <artifactId>iceberg-data</artifactId>
+ <version>${iceberg.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.iceberg</groupId>
+ <artifactId>iceberg-parquet</artifactId>
+ <version>${iceberg.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.iceberg</groupId>
+ <artifactId>iceberg-flink-${iceberg.flink.version}</artifactId>
+ <version>${iceberg.version}</version>
+ <scope>test</scope>
+ </dependency>
+
<!-- hadoop dependency -->
<dependency>
<groupId>org.apache.hadoop</groupId>
@@ -88,7 +139,7 @@ under the License.
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>${hadoop.version}</version>
- <scope>test</scope>
+ <scope>provided</scope>
<exclusions>
<exclusion>
<groupId>org.apache.avro</groupId>
@@ -118,37 +169,46 @@ under the License.
</dependency>
<dependency>
- <groupId>org.apache.commons</groupId>
- <artifactId>commons-compress</artifactId>
- <version>1.21</version>
- <scope>test</scope>
- </dependency>
-
- <!-- iceberg dependency -->
- <dependency>
- <groupId>org.apache.iceberg</groupId>
- <artifactId>iceberg-core</artifactId>
- <version>${iceberg.version}</version>
- <scope>test</scope>
- </dependency>
-
- <dependency>
- <groupId>org.apache.iceberg</groupId>
- <artifactId>iceberg-data</artifactId>
- <version>${iceberg.version}</version>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-mapreduce-client-core</artifactId>
+ <version>${hadoop.version}</version>
<scope>test</scope>
<exclusions>
<exclusion>
- <artifactId>parquet-hadoop</artifactId>
- <groupId>org.apache.parquet</groupId>
+ <groupId>com.google.protobuf</groupId>
+ <artifactId>protobuf-java</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>ch.qos.reload4j</groupId>
+ <artifactId>reload4j</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-reload4j</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>log4j</groupId>
+ <artifactId>log4j</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>jdk.tools</groupId>
+ <artifactId>jdk.tools</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>commons-io</groupId>
+ <artifactId>commons-io</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
- <groupId>org.apache.iceberg</groupId>
- <artifactId>iceberg-flink-${iceberg.flink.version}</artifactId>
- <version>${iceberg.version}</version>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-compress</artifactId>
+ <version>1.21</version>
<scope>test</scope>
</dependency>
@@ -182,6 +242,29 @@ under the License.
<scope>test</scope>
</dependency>
+ <!-- web test dependency -->
+
+ <dependency>
+ <groupId>org.eclipse.jetty</groupId>
+ <artifactId>jetty-server</artifactId>
+ <version>11.0.24</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.eclipse.jetty</groupId>
+ <artifactId>jetty-servlet</artifactId>
+ <version>11.0.24</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>jakarta.servlet</groupId>
+ <artifactId>jakarta.servlet-api</artifactId>
+ <version>6.1.0</version>
+ <scope>test</scope>
+ </dependency>
+
</dependencies>
</project>
\ No newline at end of file
diff --git
a/paimon-core/src/main/java/org/apache/paimon/iceberg/IcebergMetadataCommitter.java
b/paimon-iceberg/src/main/java/org/apache/iceberg/IcebergSnapshotRefType.java
similarity index 70%
copy from
paimon-core/src/main/java/org/apache/paimon/iceberg/IcebergMetadataCommitter.java
copy to
paimon-iceberg/src/main/java/org/apache/iceberg/IcebergSnapshotRefType.java
index 2fe1314983..02343a1173 100644
---
a/paimon-core/src/main/java/org/apache/paimon/iceberg/IcebergMetadataCommitter.java
+++
b/paimon-iceberg/src/main/java/org/apache/iceberg/IcebergSnapshotRefType.java
@@ -16,17 +16,15 @@
* limitations under the License.
*/
-package org.apache.paimon.iceberg;
+package org.apache.iceberg;
-import org.apache.paimon.fs.Path;
+/** doc. */
+public class IcebergSnapshotRefType {
+ public static SnapshotRefType branchType() {
+ return SnapshotRefType.BRANCH;
+ }
-import javax.annotation.Nullable;
-
-/**
- * Commit Iceberg metadata to metastore. Each kind of Iceberg catalog should
have its own
- * implementation.
- */
-public interface IcebergMetadataCommitter {
-
- void commitMetadata(Path newMetadataPath, @Nullable Path baseMetadataPath);
+ public static SnapshotRefType tagType() {
+ return SnapshotRefType.TAG;
+ }
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/iceberg/IcebergMetadataCommitter.java
b/paimon-iceberg/src/main/java/org/apache/paimon/iceberg/IcebergRESTMetadataCommitterFactory.java
similarity index 67%
copy from
paimon-core/src/main/java/org/apache/paimon/iceberg/IcebergMetadataCommitter.java
copy to
paimon-iceberg/src/main/java/org/apache/paimon/iceberg/IcebergRESTMetadataCommitterFactory.java
index 2fe1314983..e73a883f47 100644
---
a/paimon-core/src/main/java/org/apache/paimon/iceberg/IcebergMetadataCommitter.java
+++
b/paimon-iceberg/src/main/java/org/apache/paimon/iceberg/IcebergRESTMetadataCommitterFactory.java
@@ -18,15 +18,17 @@
package org.apache.paimon.iceberg;
-import org.apache.paimon.fs.Path;
+import org.apache.paimon.table.FileStoreTable;
-import javax.annotation.Nullable;
+/** doc. */
+public class IcebergRESTMetadataCommitterFactory implements
IcebergMetadataCommitterFactory {
+ @Override
+ public String identifier() {
+ return IcebergOptions.StorageType.REST_CATALOG.toString();
+ }
-/**
- * Commit Iceberg metadata to metastore. Each kind of Iceberg catalog should
have its own
- * implementation.
- */
-public interface IcebergMetadataCommitter {
-
- void commitMetadata(Path newMetadataPath, @Nullable Path baseMetadataPath);
+ @Override
+ public IcebergMetadataCommitter create(FileStoreTable table) {
+ return new IcebergRestMetadataCommitter(table);
+ }
}
diff --git
a/paimon-iceberg/src/main/java/org/apache/paimon/iceberg/IcebergRestMetadataCommitter.java
b/paimon-iceberg/src/main/java/org/apache/paimon/iceberg/IcebergRestMetadataCommitter.java
new file mode 100644
index 0000000000..1fa0db2441
--- /dev/null
+++
b/paimon-iceberg/src/main/java/org/apache/paimon/iceberg/IcebergRestMetadataCommitter.java
@@ -0,0 +1,430 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.iceberg;
+
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.iceberg.metadata.IcebergMetadata;
+import org.apache.paimon.iceberg.metadata.IcebergSchema;
+import org.apache.paimon.iceberg.metadata.IcebergSnapshot;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.utils.Preconditions;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.BaseTable;
+import org.apache.iceberg.CatalogUtil;
+import org.apache.iceberg.MetadataUpdate;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.SnapshotRef;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.TableMetadataParser;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.rest.RESTCatalog;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.apache.iceberg.CatalogUtil.ICEBERG_CATALOG_TYPE;
+import static
org.apache.iceberg.TableProperties.METADATA_DELETE_AFTER_COMMIT_ENABLED;
+import static
org.apache.iceberg.TableProperties.METADATA_PREVIOUS_VERSIONS_MAX;
+
+/**
+ * commit Iceberg metadata to Iceberg's rest catalog, so the table can be
visited by Iceberg's rest
+ * catalog.
+ */
+public class IcebergRestMetadataCommitter implements IcebergMetadataCommitter {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(IcebergRestMetadataCommitter.class);
+
+ private static final String REST_CATALOG_NAME = "rest-catalog";
+
+ private final RESTCatalog restCatalog;
+ private final String icebergDatabaseName;
+ private final TableIdentifier icebergTableIdentifier;
+ private final IcebergOptions icebergOptions;
+
+ private Table icebergTable;
+
+ public IcebergRestMetadataCommitter(FileStoreTable table) {
+ Options options = new Options(table.options());
+ icebergOptions = new IcebergOptions(options);
+
+ Identifier identifier =
Preconditions.checkNotNull(table.catalogEnvironment().identifier());
+ String icebergDatabase =
options.get(IcebergOptions.METASTORE_DATABASE);
+ String icebergTable = options.get(IcebergOptions.METASTORE_TABLE);
+ this.icebergDatabaseName =
+ icebergDatabase != null && !icebergDatabase.isEmpty()
+ ? icebergDatabase
+ : identifier.getDatabaseName();
+ String icebergTableName =
+ icebergTable != null && !icebergTable.isEmpty()
+ ? icebergTable
+ : identifier.getTableName();
+ this.icebergTableIdentifier =
+ TableIdentifier.of(Namespace.of(icebergDatabaseName),
icebergTableName);
+
+ Map<String, String> restConfigs = icebergOptions.icebergRestConfig();
+
+ try {
+ Configuration hadoopConf = new Configuration();
+
hadoopConf.setClassLoader(IcebergRestMetadataCommitter.class.getClassLoader());
+
+ this.restCatalog = initRestCatalog(restConfigs, hadoopConf);
+ } catch (Exception e) {
+ throw new RuntimeException("Fail to initialize iceberg rest
catalog.", e);
+ }
+ }
+
+ @Override
+ public String identifier() {
+ return "rest";
+ }
+
+ @Override
+ public void commitMetadata(Path newMetadataPath, @Nullable Path
baseMetadataPath) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void commitMetadata(
+ IcebergMetadata newIcebergMetadata, @Nullable IcebergMetadata
baseIcebergMetadata) {
+ try {
+ commitMetadataImpl(newIcebergMetadata, baseIcebergMetadata);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private void commitMetadataImpl(
+ IcebergMetadata newIcebergMetadata, @Nullable IcebergMetadata
baseIcebergMetadata) {
+
+ newIcebergMetadata = adjustMetadataForRest(newIcebergMetadata);
+ TableMetadata newMetadata =
TableMetadataParser.fromJson(newIcebergMetadata.toJson());
+
+ // updates to be committed
+ TableMetadata.Builder updatdeBuilder;
+
+ // create database if not exist
+ if (!databaseExists()) {
+ createDatabase();
+ }
+
+ try {
+ if (!tableExists()) {
+ LOG.info("Table {} does not exist, create it.",
icebergTableIdentifier);
+ icebergTable = createTable();
+ updatdeBuilder =
+ updatesForCorrectBase(
+ ((BaseTable)
icebergTable).operations().current(),
+ newMetadata,
+ true);
+ } else {
+ icebergTable = getTable();
+
+ TableMetadata metadata = ((BaseTable)
icebergTable).operations().current();
+ boolean withBase = checkBase(metadata, newMetadata,
baseIcebergMetadata);
+ if (withBase) {
+ LOG.info("create updates with base metadata.");
+ updatdeBuilder = updatesForCorrectBase(metadata,
newMetadata, false);
+ } else {
+ LOG.info(
+ "create updates without base metadata.
currentSnapshotId for base metadata: {}, for new metadata:{}",
+ metadata.currentSnapshot().snapshotId(),
+ newMetadata.currentSnapshot().snapshotId());
+ updatdeBuilder = updatesForIncorrectBase(newMetadata);
+ }
+ }
+ } catch (Exception e) {
+ throw new RuntimeException(
+ "Fail to create table or get table: " +
icebergTableIdentifier, e);
+ }
+
+ TableMetadata updatedForCommit = updatdeBuilder.build();
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("updates:{}",
updatesToString(updatedForCommit.changes()));
+ }
+
+ try {
+ ((BaseTable) icebergTable)
+ .operations()
+ .commit(((BaseTable) icebergTable).operations().current(),
updatedForCommit);
+ } catch (Exception e) {
+ throw new RuntimeException("Fail to commit metadata to rest
catalog.", e);
+ }
+ }
+
+ private TableMetadata.Builder updatesForCorrectBase(
+ TableMetadata base, TableMetadata newMetadata, boolean isNewTable)
{
+ TableMetadata.Builder updateBuilder = TableMetadata.buildFrom(base);
+
+ int schemaId = icebergTable.schema().schemaId();
+ if (isNewTable) {
+ Preconditions.checkArgument(
+ schemaId == 0,
+ "the schema id for newly created iceberg table should be
0, but is %s",
+ schemaId);
+ // add all schemas
+ addAndSetCurrentSchema(
+ newMetadata.schemas(), newMetadata.currentSchemaId(),
updateBuilder);
+ updateBuilder.addPartitionSpec(newMetadata.spec());
+ updateBuilder.setDefaultPartitionSpec(newMetadata.defaultSpecId());
+
+ // add snapshot
+ addNewSnapshot(newMetadata.currentSnapshot(), updateBuilder);
+
+ } else {
+ // add new schema if needed
+ Preconditions.checkArgument(
+ newMetadata.currentSchemaId() >= schemaId,
+ "the new metadata has correct base, but the schemaId(%s)
in iceberg table "
+ + "is greater than currentSchemaId(%s) in new
metadata.",
+ schemaId,
+ newMetadata.currentSchemaId());
+ if (newMetadata.currentSchemaId() != schemaId) {
+ addAndSetCurrentSchema(
+ newMetadata.schemas().stream()
+ .filter(schema -> schema.schemaId() > schemaId)
+ .collect(Collectors.toList()),
+ newMetadata.currentSchemaId(),
+ updateBuilder);
+ }
+
+ // add snapshot
+ addNewSnapshot(newMetadata.currentSnapshot(), updateBuilder);
+
+ // remove snapshots not in new metadata
+ Set<Long> snapshotIdsToRemove = new HashSet<>();
+ icebergTable
+ .snapshots()
+ .forEach(snapshot ->
snapshotIdsToRemove.add(snapshot.snapshotId()));
+ Set<Long> snapshotIdsInNewMetadata =
+ newMetadata.snapshots().stream()
+ .map(Snapshot::snapshotId)
+ .collect(Collectors.toSet());
+ snapshotIdsToRemove.removeAll(snapshotIdsInNewMetadata);
+ removeSnapshots(snapshotIdsToRemove, updateBuilder);
+ }
+
+ return updateBuilder;
+ }
+
+ private TableMetadata.Builder updatesForIncorrectBase(TableMetadata
newMetadata) {
+ LOG.info("the base metadata is incorrect, we'll recreate the iceberg
table.");
+ icebergTable = recreateTable();
+ return updatesForCorrectBase(
+ ((BaseTable) icebergTable).operations().current(),
newMetadata, true);
+ }
+
+ private RESTCatalog initRestCatalog(Map<String, String> restConfigs,
Configuration conf) {
+ restConfigs.put(ICEBERG_CATALOG_TYPE, "rest");
+ Catalog catalog = CatalogUtil.buildIcebergCatalog(REST_CATALOG_NAME,
restConfigs, conf);
+ return (RESTCatalog) catalog;
+ }
+
+ //
-------------------------------------------------------------------------------------
+ // rest catalog invoke
+ //
-------------------------------------------------------------------------------------
+
+ private boolean databaseExists() {
+ return restCatalog.namespaceExists(Namespace.of(icebergDatabaseName));
+ }
+
+ private boolean tableExists() {
+ return restCatalog.tableExists(icebergTableIdentifier);
+ }
+
+ private void createDatabase() {
+ restCatalog.createNamespace(Namespace.of(icebergDatabaseName));
+ }
+
+ private Table createTable() {
+ /* Here we create iceberg table with an emptySchema. This is because:
+ When creating table, fieldId in iceberg will be forced to start from
1, while fieldId in paimon usually start from 0.
+ If we directly use the schema extracted from paimon to create iceberg
table, the fieldId will be in disorder, and this
+ may cause incorrectness when reading by iceberg reader. So we use an
emptySchema here, and add the corresponding
+ schemas later.
+ */
+ Schema emptySchema = new Schema();
+ return restCatalog.createTable(icebergTableIdentifier, emptySchema);
+ }
+
+ private Table getTable() {
+ return restCatalog.loadTable(icebergTableIdentifier);
+ }
+
+ private void dropTable() {
+ // set purge to false, because we don't need to delete the data files
+ restCatalog.dropTable(icebergTableIdentifier, false);
+ }
+
+ private Table recreateTable() {
+ try {
+ dropTable();
+ return createTable();
+ } catch (Exception e) {
+ throw new RuntimeException("Fail to recreate iceberg table.", e);
+ }
+ }
+
+ //
-------------------------------------------------------------------------------------
+ // metadata updates
+ //
-------------------------------------------------------------------------------------
+
+ // add a new snapshot and point it as current snapshot
+ private void addNewSnapshot(Snapshot newSnapshot, TableMetadata.Builder
update) {
+ update.setBranchSnapshot(newSnapshot, SnapshotRef.MAIN_BRANCH);
+ }
+
+ // remove snapshots recorded in table metadata
+ private void removeSnapshots(Set<Long> snapshotIds, TableMetadata.Builder
update) {
+ update.removeSnapshots(snapshotIds);
+ }
+
+ // add schemas and set the current schema id
+ private void addAndSetCurrentSchema(
+ List<Schema> schemas, int currentSchemaId, TableMetadata.Builder
update) {
+ for (Schema schema : schemas) {
+ update.addSchema(schema);
+ }
+ update.setCurrentSchema(currentSchemaId);
+
+ // update properties
+ Map<String, String> properties = new HashMap<>();
+ properties.put(
+ METADATA_PREVIOUS_VERSIONS_MAX,
+ String.valueOf(icebergOptions.previousVersionsMax()));
+ properties.put(
+ METADATA_DELETE_AFTER_COMMIT_ENABLED,
+ String.valueOf(icebergOptions.deleteAfterCommitEnabled()));
+ update.setProperties(properties);
+ }
+
+ //
-------------------------------------------------------------------------------------
+ // Utils
+ //
-------------------------------------------------------------------------------------
+
+ /**
+ * @param currentMetadata the current metadata used by iceberg table
+ * @param newMetadata the new metadata to be committed
+ * @param baseIcebergMetadata the base metadata previously written by
paimon
+ * @return whether the iceberg table has base metadata
+ */
+ private static boolean checkBase(
+ TableMetadata currentMetadata,
+ TableMetadata newMetadata,
+ @Nullable IcebergMetadata baseIcebergMetadata) {
+ // take the base metadata from IcebergCommitCallback as the first
reference
+ if (baseIcebergMetadata == null) {
+ LOG.info(
+ "new metadata without base metadata cause base metadata
from upstream is null.");
+ return false;
+ }
+
+ // if the iceberg table is existed, check whether the current metadata
of the table is the
+ // base of the new table metadata, we use current snapshot id to check
+ return currentMetadata.currentSnapshot().snapshotId()
+ == newMetadata.currentSnapshot().snapshotId() - 1;
+ }
+
+ private IcebergMetadata adjustMetadataForRest(IcebergMetadata
newIcebergMetadata) {
+ // why need this:
+ // Since we will use an empty schema to create iceberg table in rest
catalog and id-0 will
+ // be occupied by the empty schema, there will be 1-unit offset
between the schema-id in
+ // metadata stored in rest catalog and the schema-id in paimon.
+
+ List<IcebergSchema> schemas =
+ newIcebergMetadata.schemas().stream()
+ .map(schema -> new IcebergSchema(schema.schemaId() +
1, schema.fields()))
+ .collect(Collectors.toList());
+ int currentSchemaId = newIcebergMetadata.currentSchemaId() + 1;
+ List<IcebergSnapshot> snapshots =
+ newIcebergMetadata.snapshots().stream()
+ .map(
+ snapshot ->
+ new IcebergSnapshot(
+ snapshot.sequenceNumber(),
+ snapshot.snapshotId(),
+ snapshot.timestampMs(),
+ snapshot.summary(),
+ snapshot.manifestList(),
+ snapshot.schemaId() + 1))
+ .collect(Collectors.toList());
+ return new IcebergMetadata(
+ newIcebergMetadata.formatVersion(),
+ newIcebergMetadata.tableUuid(),
+ newIcebergMetadata.location(),
+ newIcebergMetadata.currentSnapshotId(),
+ newIcebergMetadata.lastColumnId(),
+ schemas,
+ currentSchemaId,
+ newIcebergMetadata.partitionSpecs(),
+ newIcebergMetadata.lastPartitionId(),
+ snapshots,
+ newIcebergMetadata.currentSnapshotId(),
+ newIcebergMetadata.refs());
+ }
+
+ private static String updateToString(MetadataUpdate update) {
+ if (update instanceof MetadataUpdate.AddSnapshot) {
+ return String.format(
+ "AddSnapshot(%s)",
+ ((MetadataUpdate.AddSnapshot)
update).snapshot().snapshotId());
+ } else if (update instanceof MetadataUpdate.RemoveSnapshot) {
+ return String.format(
+ "RemoveSnapshot(%s)", ((MetadataUpdate.RemoveSnapshot)
update).snapshotId());
+ } else if (update instanceof MetadataUpdate.SetSnapshotRef) {
+ return String.format(
+ "SetSnapshotRef(%s, %s, %s)",
+ ((MetadataUpdate.SetSnapshotRef) update).name(),
+ ((MetadataUpdate.SetSnapshotRef) update).type(),
+ ((MetadataUpdate.SetSnapshotRef) update).snapshotId());
+ } else if (update instanceof MetadataUpdate.AddSchema) {
+ return String.format(
+ "AddSchema(%s)", ((MetadataUpdate.AddSchema)
update).schema().schemaId());
+ } else if (update instanceof MetadataUpdate.SetCurrentSchema) {
+ return String.format(
+ "SetCurrentSchema(%s)", ((MetadataUpdate.SetCurrentSchema)
update).schemaId());
+ } else if (update instanceof MetadataUpdate.SetProperties) {
+ return String.format(
+ "SetProperties(%s)", ((MetadataUpdate.SetProperties)
update).updated());
+ } else {
+ return update.toString();
+ }
+ }
+
+ private static String updatesToString(List<MetadataUpdate> updates) {
+ return updates.stream()
+ .map(IcebergRestMetadataCommitter::updateToString)
+ .collect(Collectors.joining(", "));
+ }
+}
diff --git
a/paimon-iceberg/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
b/paimon-iceberg/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
new file mode 100644
index 0000000000..5820876fb0
--- /dev/null
+++
b/paimon-iceberg/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.paimon.iceberg.IcebergRESTMetadataCommitterFactory
diff --git
a/paimon-iceberg/src/test/java/org/apache/paimon/flink/IcebergRestMetadataCommitterITCase.java
b/paimon-iceberg/src/test/java/org/apache/paimon/flink/IcebergRestMetadataCommitterITCase.java
new file mode 100644
index 0000000000..72de210ba7
--- /dev/null
+++
b/paimon-iceberg/src/test/java/org/apache/paimon/flink/IcebergRestMetadataCommitterITCase.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink;
+
+import org.apache.paimon.flink.util.AbstractTestBase;
+import org.apache.paimon.iceberg.IcebergRestMetadataCommitter;
+
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.TableResult;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CloseableIterator;
+import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.hadoop.HadoopCatalog;
+import org.apache.iceberg.rest.RESTCatalog;
+import org.apache.iceberg.rest.RESTCatalogServer;
+import org.apache.iceberg.rest.RESTServerExtension;
+import org.junit.jupiter.api.extension.RegisterExtension;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** IT cases for {@link IcebergRestMetadataCommitter}. */
+public class IcebergRestMetadataCommitterITCase extends AbstractTestBase {
+
+ @RegisterExtension
+ private static final RESTServerExtension REST_SERVER_EXTENSION =
+ new RESTServerExtension(
+ Map.of(
+ RESTCatalogServer.REST_PORT,
+ RESTServerExtension.FREE_PORT,
+ CatalogProperties.CLIENT_POOL_SIZE,
+ "1",
+ CatalogProperties.CATALOG_IMPL,
+ HadoopCatalog.class.getName()));
+
+ @ParameterizedTest
+ @ValueSource(strings = {"avro", "parquet", "orc"})
+ public void testCommitToRestCatalog(String fileFormat) throws Exception {
+ String warehouse = getTempDirPath();
+ TableEnvironment tEnv =
tableEnvironmentBuilder().batchMode().parallelism(2).build();
+
+ RESTCatalog restCatalog = REST_SERVER_EXTENSION.client();
+ String restUri = restCatalog.properties().get(CatalogProperties.URI);
+ String restWarehouse =
restCatalog.properties().get(CatalogProperties.WAREHOUSE_LOCATION);
+ String restClients =
restCatalog.properties().get(CatalogProperties.CLIENT_POOL_SIZE);
+
+ tEnv.executeSql(
+ "CREATE CATALOG paimon WITH (\n"
+ + " 'type' = 'paimon',\n"
+ + " 'warehouse' = '"
+ + warehouse
+ + "'\n"
+ + ")");
+ tEnv.executeSql(
+ String.format(
+ "CREATE TABLE paimon.`default`.T (\n"
+ + " pt INT,\n"
+ + " k INT,\n"
+ + " v INT,\n"
+ + " PRIMARY KEY (pt, k) NOT ENFORCED\n"
+ + ") PARTITIONED BY (pt) WITH (\n"
+ + " 'metadata.iceberg.storage' =
'rest-catalog',\n"
+ + " 'metadata.iceberg.rest.uri' = '%s',\n"
+ + " 'metadata.iceberg.rest.warehouse' =
'%s',\n"
+ + " 'metadata.iceberg.rest.clients' = '%s',\n"
+ + " 'file.format' = '%s'\n"
+ + ")",
+ restUri, restWarehouse, restClients, fileFormat));
+ tEnv.executeSql(
+ "INSERT INTO paimon.`default`.T VALUES "
+ + "(1, 9, 90), "
+ + "(1, 10, 100), "
+ + "(1, 11, 110), "
+ + "(2, 20, 200)")
+ .await();
+
+ tEnv.executeSql(
+ String.format(
+ "CREATE CATALOG iceberg WITH (\n"
+ + " 'type' = 'iceberg',\n"
+ + " 'catalog-type' = 'rest',\n"
+ + " 'uri' = '%s',\n"
+ + " 'clients' = '%s',\n"
+ + " 'cache-enabled' = 'false'\n"
+ + ")",
+ restUri, restClients));
+ assertThat(
+ collect(
+ tEnv.executeSql(
+ "SELECT v, k, pt FROM
iceberg.`default`.T ORDER BY pt, k")))
+ .containsExactly(
+ Row.of(90, 9, 1),
+ Row.of(100, 10, 1),
+ Row.of(110, 11, 1),
+ Row.of(200, 20, 2));
+ }
+
+ private List<Row> collect(TableResult result) throws Exception {
+ List<Row> rows = new ArrayList<>();
+ try (CloseableIterator<Row> it = result.collect()) {
+ while (it.hasNext()) {
+ rows.add(it.next());
+ }
+ }
+ return rows;
+ }
+}
diff --git
a/paimon-iceberg/src/test/java/org/apache/paimon/iceberg/IcebergRestMetadataCommitterTest.java
b/paimon-iceberg/src/test/java/org/apache/paimon/iceberg/IcebergRestMetadataCommitterTest.java
new file mode 100644
index 0000000000..a2e3f40bb9
--- /dev/null
+++
b/paimon-iceberg/src/test/java/org/apache/paimon/iceberg/IcebergRestMetadataCommitterTest.java
@@ -0,0 +1,641 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.iceberg;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.catalog.FileSystemCatalog;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.BinaryRowWriter;
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.fs.local.LocalFileIO;
+import org.apache.paimon.iceberg.metadata.IcebergMetadata;
+import org.apache.paimon.options.MemorySize;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.schema.SchemaChange;
+import org.apache.paimon.schema.SchemaManager;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.sink.TableCommitImpl;
+import org.apache.paimon.table.sink.TableWriteImpl;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowType;
+
+import org.apache.iceberg.BaseTable;
+import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.data.IcebergGenerics;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.hadoop.HadoopCatalog;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.rest.RESTCatalog;
+import org.apache.iceberg.rest.RESTCatalogServer;
+import org.apache.iceberg.rest.RESTServerExtension;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import static
org.apache.paimon.iceberg.IcebergCommitCallback.catalogTableMetadataPath;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link IcebergRestMetadataCommitter}. */
+public class IcebergRestMetadataCommitterTest {
+
+ @RegisterExtension
+ private static final RESTServerExtension REST_SERVER_EXTENSION =
+ new RESTServerExtension(
+ Map.of(
+ RESTCatalogServer.REST_PORT,
+ RESTServerExtension.FREE_PORT,
+ CatalogProperties.CLIENT_POOL_SIZE,
+ "1",
+ CatalogProperties.CATALOG_IMPL,
+ HadoopCatalog.class.getName()));
+
+ @TempDir public java.nio.file.Path tempDir;
+
+ protected static RESTCatalog restCatalog;
+
+ @BeforeEach
+ public void setUp() {
+ restCatalog = REST_SERVER_EXTENSION.client();
+ }
+
+ @Test
+ public void testUnPartitionedPrimaryKeyTable() throws Exception {
+ RowType rowType =
+ RowType.of(
+ new DataType[] {
+ DataTypes.INT(), DataTypes.STRING(),
DataTypes.INT(), DataTypes.BIGINT()
+ },
+ new String[] {"k1", "k2", "v1", "v2"});
+
+ int numRounds = 20;
+ int numRecords = 1000;
+ ThreadLocalRandom random = ThreadLocalRandom.current();
+ List<List<TestRecord>> testRecords = new ArrayList<>();
+ List<List<String>> expected = new ArrayList<>();
+ Map<String, String> expectedMap = new LinkedHashMap<>();
+ for (int r = 0; r < numRounds; r++) {
+ List<TestRecord> round = new ArrayList<>();
+ for (int i = 0; i < numRecords; i++) {
+ int k1 = random.nextInt(0, 100);
+ String k2 = String.valueOf(random.nextInt(1000, 1010));
+ int v1 = random.nextInt();
+ long v2 = random.nextLong();
+ round.add(
+ new TestRecord(
+ BinaryRow.EMPTY_ROW,
+ GenericRow.of(k1, BinaryString.fromString(k2),
v1, v2)));
+ expectedMap.put(String.format("%d, %s", k1, k2),
String.format("%d, %d", v1, v2));
+ }
+ testRecords.add(round);
+ expected.add(
+ expectedMap.entrySet().stream()
+ .map(e -> String.format("Record(%s, %s)",
e.getKey(), e.getValue()))
+ .collect(Collectors.toList()));
+ }
+
+ runCompatibilityTest(
+ rowType,
+ Collections.emptyList(),
+ Arrays.asList("k1", "k2"),
+ testRecords,
+ expected,
+ Record::toString);
+ }
+
+ @Test
+ public void testPartitionedPrimaryKeyTable() throws Exception {
+ RowType rowType =
+ RowType.of(
+ new DataType[] {
+ DataTypes.INT(),
+ DataTypes.STRING(),
+ DataTypes.STRING(),
+ DataTypes.INT(),
+ DataTypes.BIGINT()
+ },
+ new String[] {"pt1", "pt2", "k", "v1", "v2"});
+
+ BiFunction<Integer, String, BinaryRow> binaryRow =
+ (pt1, pt2) -> {
+ BinaryRow b = new BinaryRow(2);
+ BinaryRowWriter writer = new BinaryRowWriter(b);
+ writer.writeInt(0, pt1);
+ writer.writeString(1, BinaryString.fromString(pt2));
+ writer.complete();
+ return b;
+ };
+
+ int numRounds = 20;
+ int numRecords = 500;
+ ThreadLocalRandom random = ThreadLocalRandom.current();
+ boolean samePartitionEachRound = random.nextBoolean();
+
+ List<List<TestRecord>> testRecords = new ArrayList<>();
+ List<List<String>> expected = new ArrayList<>();
+ Map<String, String> expectedMap = new LinkedHashMap<>();
+ for (int r = 0; r < numRounds; r++) {
+ List<TestRecord> round = new ArrayList<>();
+ for (int i = 0; i < numRecords; i++) {
+ int pt1 = (random.nextInt(0, samePartitionEachRound ? 1 : 2) +
r) % 3;
+ String pt2 = String.valueOf(random.nextInt(10, 12));
+ String k = String.valueOf(random.nextInt(0, 100));
+ int v1 = random.nextInt();
+ long v2 = random.nextLong();
+ round.add(
+ new TestRecord(
+ binaryRow.apply(pt1, pt2),
+ GenericRow.of(
+ pt1,
+ BinaryString.fromString(pt2),
+ BinaryString.fromString(k),
+ v1,
+ v2)));
+ expectedMap.put(
+ String.format("%d, %s, %s", pt1, pt2, k),
String.format("%d, %d", v1, v2));
+ }
+ testRecords.add(round);
+ expected.add(
+ expectedMap.entrySet().stream()
+ .map(e -> String.format("Record(%s, %s)",
e.getKey(), e.getValue()))
+ .collect(Collectors.toList()));
+ }
+
+ runCompatibilityTest(
+ rowType,
+ Arrays.asList("pt1", "pt2"),
+ Arrays.asList("pt1", "pt2", "k"),
+ testRecords,
+ expected,
+ Record::toString);
+ }
+
+ private void runCompatibilityTest(
+ RowType rowType,
+ List<String> partitionKeys,
+ List<String> primaryKeys,
+ List<List<TestRecord>> testRecords,
+ List<List<String>> expected,
+ Function<Record, String> icebergRecordToString)
+ throws Exception {
+ FileStoreTable table =
+ createPaimonTable(
+ rowType,
+ partitionKeys,
+ primaryKeys,
+ primaryKeys.isEmpty() ? -1 : 2,
+ randomFormat(),
+ Collections.emptyMap());
+
+ String commitUser = UUID.randomUUID().toString();
+ TableWriteImpl<?> write = table.newWrite(commitUser);
+ TableCommitImpl commit = table.newCommit(commitUser);
+
+ for (int r = 0; r < testRecords.size(); r++) {
+ List<TestRecord> round = testRecords.get(r);
+ for (TestRecord testRecord : round) {
+ write.write(testRecord.record);
+ }
+
+ if (!primaryKeys.isEmpty()) {
+ for (BinaryRow partition :
+ round.stream().map(t ->
t.partition).collect(Collectors.toSet())) {
+ for (int b = 0; b < 2; b++) {
+ write.compact(partition, b, true);
+ }
+ }
+ }
+ commit.commit(r, write.prepareCommit(true, r));
+
+ assertThat(
+ getIcebergResult(
+ icebergTable ->
IcebergGenerics.read(icebergTable).build(),
+ icebergRecordToString))
+ .hasSameElementsAs(expected.get(r));
+ }
+
+ write.close();
+ commit.close();
+ }
+
+ @Test
+ public void testSchemaAndPropertiesChange() throws Exception {
+ RowType rowType =
+ RowType.of(
+ new DataType[] {DataTypes.INT(), DataTypes.INT()}, new
String[] {"k", "v"});
+ FileStoreTable table =
+ createPaimonTable(
+ rowType,
+ Collections.emptyList(),
+ Collections.singletonList("k"),
+ 1,
+ randomFormat(),
+ Collections.emptyMap());
+
+ String commitUser = UUID.randomUUID().toString();
+ TableWriteImpl<?> write = table.newWrite(commitUser);
+ TableCommitImpl commit = table.newCommit(commitUser);
+
+ write.write(GenericRow.of(1, 10));
+ write.write(GenericRow.of(2, 20));
+ commit.commit(1, write.prepareCommit(false, 1));
+ assertThat(getIcebergResult()).containsExactlyInAnyOrder("Record(1,
10)", "Record(2, 20)");
+
+ SchemaManager schemaManager = new SchemaManager(table.fileIO(),
table.location());
+ // change1: add a column
+ // change2: change 'metadata.iceberg.delete-after-commit.enabled' to
false
+ // change3: change 'metadata.iceberg.previous-versions-max' to 10
+ schemaManager.commitChanges(
+ SchemaChange.addColumn("v2", DataTypes.STRING()),
+
SchemaChange.setOption(IcebergOptions.METADATA_DELETE_AFTER_COMMIT.key(),
"false"),
+
SchemaChange.setOption(IcebergOptions.METADATA_PREVIOUS_VERSIONS_MAX.key(),
"10"));
+ table = table.copy(table.schemaManager().latest().get());
+ write.close();
+ write = table.newWrite(commitUser);
+ commit.close();
+ commit = table.newCommit(commitUser);
+
+ write.write(GenericRow.of(1, 11, BinaryString.fromString("one")));
+ write.write(GenericRow.of(3, 30, BinaryString.fromString("three")));
+ write.compact(BinaryRow.EMPTY_ROW, 0, true);
+ commit.commit(2, write.prepareCommit(true, 2));
+ assertThat(getIcebergResult())
+ .containsExactlyInAnyOrder(
+ "Record(1, 11, one)", "Record(2, 20, null)",
"Record(3, 30, three)");
+
+ write.write(GenericRow.of(2, 21, BinaryString.fromString("two")));
+ write.compact(BinaryRow.EMPTY_ROW, 0, true);
+ commit.commit(3, write.prepareCommit(true, 3));
+ assertThat(getIcebergResult())
+ .containsExactlyInAnyOrder(
+ "Record(1, 11, one)", "Record(2, 21, two)", "Record(3,
30, three)");
+
+ Table icebergTable = restCatalog.loadTable(TableIdentifier.of("mydb",
"t"));
+ assertThat(icebergTable.currentSnapshot().snapshotId()).isEqualTo(5);
+ // 1 metadata for createTable + 4 history metadata
+ assertThat(((BaseTable)
icebergTable).operations().current().previousFiles().size())
+ .isEqualTo(5);
+
+ write.close();
+ commit.close();
+ }
+
+ @Test
+ public void testSchemaChangeBeforeSync() throws Exception {
+ RowType rowType =
+ RowType.of(
+ new DataType[] {DataTypes.INT(), DataTypes.INT()}, new
String[] {"k", "v"});
+ Map<String, String> options = new HashMap<>();
+ options.put(IcebergOptions.METADATA_ICEBERG_STORAGE.key(), "disabled");
+ // disable iceberg compatibility
+ FileStoreTable table =
+ createPaimonTable(
+ rowType,
+ Collections.emptyList(),
+ Collections.singletonList("k"),
+ 1,
+ randomFormat(),
+ Collections.emptyMap())
+ .copy(options);
+
+ String commitUser = UUID.randomUUID().toString();
+ TableWriteImpl<?> write = table.newWrite(commitUser);
+ TableCommitImpl commit = table.newCommit(commitUser);
+
+ write.write(GenericRow.of(1, 10));
+ write.write(GenericRow.of(2, 20));
+ commit.commit(1, write.prepareCommit(false, 1));
+
+ // schema change
+ SchemaManager schemaManager = new SchemaManager(table.fileIO(),
table.location());
+ schemaManager.commitChanges(SchemaChange.addColumn("v2",
DataTypes.STRING()));
+ table = table.copyWithLatestSchema();
+ write.close();
+ write = table.newWrite(commitUser);
+ commit.close();
+ commit = table.newCommit(commitUser);
+
+ write.write(GenericRow.of(1, 11, BinaryString.fromString("one")));
+ write.write(GenericRow.of(3, 30, BinaryString.fromString("three")));
+ write.compact(BinaryRow.EMPTY_ROW, 0, true);
+ commit.commit(2, write.prepareCommit(true, 2));
+
+ // enable iceberg compatibility
+ options.put(IcebergOptions.METADATA_ICEBERG_STORAGE.key(),
"rest-catalog");
+ table = table.copy(options);
+ write.close();
+ write = table.newWrite(commitUser);
+ commit.close();
+ commit = table.newCommit(commitUser);
+
+ write.write(GenericRow.of(4, 40, BinaryString.fromString("four")));
+ write.compact(BinaryRow.EMPTY_ROW, 0, true);
+ commit.commit(3, write.prepareCommit(true, 3));
+ assertThat(getIcebergResult())
+ .containsExactlyInAnyOrder(
+ "Record(1, 11, one)",
+ "Record(2, 20, null)",
+ "Record(3, 30, three)",
+ "Record(4, 40, four)");
+
+ write.close();
+ commit.close();
+ }
+
+ @Test
+ public void testIcebergSnapshotExpire() throws Exception {
+ RowType rowType =
+ RowType.of(
+ new DataType[] {DataTypes.INT(), DataTypes.INT()}, new
String[] {"k", "v"});
+ Map<String, String> options = new HashMap<>();
+ options.put(CoreOptions.SNAPSHOT_NUM_RETAINED_MIN.key(), "3");
+ options.put(CoreOptions.SNAPSHOT_NUM_RETAINED_MAX.key(), "3");
+ FileStoreTable table =
+ createPaimonTable(
+ rowType,
+ Collections.emptyList(),
+ Collections.singletonList("k"),
+ 1,
+ randomFormat(),
+ options);
+
+ String commitUser = UUID.randomUUID().toString();
+ TableWriteImpl<?> write = table.newWrite(commitUser);
+ TableCommitImpl commit = table.newCommit(commitUser);
+
+ write.write(GenericRow.of(1, 10));
+ write.write(GenericRow.of(2, 20));
+ commit.commit(1, write.prepareCommit(false, 1));
+ assertThat(table.snapshotManager().latestSnapshotId()).isEqualTo(1L);
+ FileIO fileIO = table.fileIO();
+ IcebergMetadata metadata =
+ IcebergMetadata.fromPath(
+ fileIO, new Path(catalogTableMetadataPath(table),
"v1.metadata.json"));
+ Table icebergTable = restCatalog.loadTable(TableIdentifier.of("mydb",
"t"));
+ assertThat(metadata.snapshots()).hasSize(1);
+ assertThat(metadata.currentSnapshotId()).isEqualTo(1);
+ // check table in rest-catalog
+ assertThat(icebergTable.currentSnapshot().snapshotId()).isEqualTo(1);
+
assertThat(ImmutableList.copyOf(icebergTable.snapshots()).size()).isEqualTo(1);
+
+ write.write(GenericRow.of(1, 11));
+ write.write(GenericRow.of(3, 30));
+ write.compact(BinaryRow.EMPTY_ROW, 0, true);
+ commit.commit(2, write.prepareCommit(true, 2));
+ assertThat(table.snapshotManager().latestSnapshotId()).isEqualTo(3L);
+ icebergTable = restCatalog.loadTable(TableIdentifier.of("mydb", "t"));
+ metadata =
+ IcebergMetadata.fromPath(
+ fileIO, new Path(catalogTableMetadataPath(table),
"v3.metadata.json"));
+ assertThat(metadata.snapshots()).hasSize(3);
+ assertThat(metadata.currentSnapshotId()).isEqualTo(3);
+ // check table in rest-catalog
+ assertThat(icebergTable.currentSnapshot().snapshotId()).isEqualTo(3);
+
assertThat(ImmutableList.copyOf(icebergTable.snapshots()).size()).isEqualTo(3);
+
+ // Number of snapshots will become 5 with the next commit, however
only 3 Iceberg snapshots
+ // are kept. So the first 2 Iceberg snapshots will be expired.
+
+ write.write(GenericRow.of(2, 21));
+ write.write(GenericRow.of(3, 31));
+ write.compact(BinaryRow.EMPTY_ROW, 0, true);
+ commit.commit(3, write.prepareCommit(true, 3));
+ assertThat(table.snapshotManager().latestSnapshotId()).isEqualTo(5L);
+ metadata =
+ IcebergMetadata.fromPath(
+ fileIO, new Path(catalogTableMetadataPath(table),
"v5.metadata.json"));
+ icebergTable = restCatalog.loadTable(TableIdentifier.of("mydb", "t"));
+ assertThat(metadata.snapshots()).hasSize(3);
+ assertThat(metadata.currentSnapshotId()).isEqualTo(5);
+ // check table in rest-catalog
+ assertThat(icebergTable.currentSnapshot().snapshotId()).isEqualTo(5);
+
assertThat(ImmutableList.copyOf(icebergTable.snapshots()).size()).isEqualTo(3);
+
+ write.close();
+ commit.close();
+
+ // The old metadata.json is removed when the new metadata.json is
created
+ // depending on the old metadata retention configuration.
+ assertThat(((BaseTable)
icebergTable).operations().current().previousFiles().size())
+ .isEqualTo(1);
+
+ assertThat(getIcebergResult())
+ .containsExactlyInAnyOrder("Record(1, 11)", "Record(2, 21)",
"Record(3, 31)");
+ assertThat(
+ getIcebergResult(
+ t ->
IcebergGenerics.read(t).useSnapshot(3).build(),
+ Record::toString))
+ .containsExactlyInAnyOrder("Record(1, 11)", "Record(2, 20)",
"Record(3, 30)");
+ assertThat(
+ getIcebergResult(
+ t ->
IcebergGenerics.read(t).useSnapshot(4).build(),
+ Record::toString))
+ .containsExactlyInAnyOrder("Record(1, 11)", "Record(2, 20)",
"Record(3, 30)");
+ }
+
+ @Test
+ public void testWithIncorrectBase() throws Exception {
+ RowType rowType =
+ RowType.of(
+ new DataType[] {DataTypes.INT(), DataTypes.INT()}, new
String[] {"k", "v"});
+ FileStoreTable table =
+ createPaimonTable(
+ rowType,
+ Collections.emptyList(),
+ Collections.singletonList("k"),
+ 1,
+ randomFormat(),
+ Collections.emptyMap());
+
+ String commitUser = UUID.randomUUID().toString();
+ TableWriteImpl<?> write = table.newWrite(commitUser);
+ TableCommitImpl commit = table.newCommit(commitUser);
+
+ write.write(GenericRow.of(1, 10));
+ write.write(GenericRow.of(2, 20));
+ commit.commit(1, write.prepareCommit(false, 1));
+
+ write.write(GenericRow.of(1, 11));
+ write.write(GenericRow.of(3, 30));
+ write.compact(BinaryRow.EMPTY_ROW, 0, true);
+ commit.commit(2, write.prepareCommit(true, 2));
+ assertThat(getIcebergResult())
+ .containsExactlyInAnyOrder("Record(1, 11)", "Record(2, 20)",
"Record(3, 30)");
+ Table icebergTable = restCatalog.loadTable(TableIdentifier.of("mydb",
"t"));
+ // generate 3 metadata files in iceberg table, and current snapshot id
is 3
+ assertThat(icebergTable.currentSnapshot().snapshotId()).isEqualTo(3);
+
+ // disable iceberg compatibility
+ Map<String, String> options = new HashMap<>();
+ options.put(IcebergOptions.METADATA_ICEBERG_STORAGE.key(), "disabled");
+ table = table.copy(options);
+ write.close();
+ write = table.newWrite(commitUser);
+ commit.close();
+ commit = table.newCommit(commitUser);
+
+ write.write(GenericRow.of(4, 40));
+ write.write(GenericRow.of(5, 50));
+ write.compact(BinaryRow.EMPTY_ROW, 0, true);
+ commit.commit(3, write.prepareCommit(true, 3));
+ assertThat(table.snapshotManager().latestSnapshotId()).isEqualTo(5L);
+
+ // enable iceberg compatibility
+ options.put(IcebergOptions.METADATA_ICEBERG_STORAGE.key(),
"rest-catalog");
+ table = table.copy(options);
+ write.close();
+ write = table.newWrite(commitUser);
+ commit.close();
+ commit = table.newCommit(commitUser);
+
+ write.write(GenericRow.of(6, 60));
+ write.compact(BinaryRow.EMPTY_ROW, 0, true);
+ commit.commit(4, write.prepareCommit(true, 4));
+ assertThat(table.snapshotManager().latestSnapshotId()).isEqualTo(7L);
+ assertThat(getIcebergResult())
+ .containsExactlyInAnyOrder(
+ "Record(1, 11)",
+ "Record(2, 20)",
+ "Record(3, 30)",
+ "Record(4, 40)",
+ "Record(5, 50)",
+ "Record(6, 60)");
+ icebergTable = restCatalog.loadTable(TableIdentifier.of("mydb", "t"));
+ assertThat(icebergTable.currentSnapshot().snapshotId()).isEqualTo(7);
+
assertThat(ImmutableList.copyOf(icebergTable.snapshots()).size()).isEqualTo(2);
+
+ write.write(GenericRow.of(4, 41));
+ write.compact(BinaryRow.EMPTY_ROW, 0, true);
+ commit.commit(5, write.prepareCommit(true, 5));
+ assertThat(getIcebergResult())
+ .containsExactlyInAnyOrder(
+ "Record(1, 11)",
+ "Record(2, 20)",
+ "Record(3, 30)",
+ "Record(4, 41)",
+ "Record(5, 50)",
+ "Record(6, 60)");
+
+ write.close();
+ commit.close();
+ }
+
+ private static class TestRecord {
+ private final BinaryRow partition;
+ private final GenericRow record;
+
+ private TestRecord(BinaryRow partition, GenericRow record) {
+ this.partition = partition;
+ this.record = record;
+ }
+ }
+
+ private FileStoreTable createPaimonTable(
+ RowType rowType,
+ List<String> partitionKeys,
+ List<String> primaryKeys,
+ int numBuckets,
+ String fileFormat,
+ Map<String, String> customOptions)
+ throws Exception {
+ LocalFileIO fileIO = LocalFileIO.create();
+ org.apache.paimon.fs.Path path = new
org.apache.paimon.fs.Path(tempDir.toString());
+
+ Options options = new Options(customOptions);
+ options.set(CoreOptions.BUCKET, numBuckets);
+ options.set(
+ IcebergOptions.METADATA_ICEBERG_STORAGE,
IcebergOptions.StorageType.REST_CATALOG);
+ options.set(CoreOptions.FILE_FORMAT, fileFormat);
+ options.set(CoreOptions.TARGET_FILE_SIZE, MemorySize.ofKibiBytes(32));
+ options.set(IcebergOptions.COMPACT_MIN_FILE_NUM, 4);
+ options.set(IcebergOptions.COMPACT_MIN_FILE_NUM, 8);
+ options.set(IcebergOptions.METADATA_DELETE_AFTER_COMMIT, true);
+ options.set(IcebergOptions.METADATA_PREVIOUS_VERSIONS_MAX, 1);
+ options.set(CoreOptions.MANIFEST_TARGET_FILE_SIZE,
MemorySize.ofKibiBytes(8));
+
+ // rest-catalog options
+ options.set(
+ IcebergOptions.REST_CONFIG_PREFIX + CatalogProperties.URI,
+ restCatalog.properties().get(CatalogProperties.URI));
+ options.set(
+ IcebergOptions.REST_CONFIG_PREFIX +
CatalogProperties.WAREHOUSE_LOCATION,
+
restCatalog.properties().get(CatalogProperties.WAREHOUSE_LOCATION));
+ options.set(
+ IcebergOptions.REST_CONFIG_PREFIX +
CatalogProperties.CLIENT_POOL_SIZE,
+
restCatalog.properties().get(CatalogProperties.CLIENT_POOL_SIZE));
+
+ org.apache.paimon.schema.Schema schema =
+ new org.apache.paimon.schema.Schema(
+ rowType.getFields(), partitionKeys, primaryKeys,
options.toMap(), "");
+
+ try (FileSystemCatalog paimonCatalog = new FileSystemCatalog(fileIO,
path)) {
+ paimonCatalog.createDatabase("mydb", false);
+ Identifier paimonIdentifier = Identifier.create("mydb", "t");
+ paimonCatalog.createTable(paimonIdentifier, schema, false);
+ return (FileStoreTable) paimonCatalog.getTable(paimonIdentifier);
+ }
+ }
+
+ private List<String> getIcebergResult() throws Exception {
+ return getIcebergResult(
+ icebergTable -> IcebergGenerics.read(icebergTable).build(),
Record::toString);
+ }
+
+ private List<String> getIcebergResult(
+ Function<Table, CloseableIterable<Record>> query,
+ Function<Record, String> icebergRecordToString)
+ throws Exception {
+ Table icebergTable = restCatalog.loadTable(TableIdentifier.of("mydb",
"t"));
+ CloseableIterable<Record> result = query.apply(icebergTable);
+ List<String> actual = new ArrayList<>();
+ for (Record record : result) {
+ actual.add(icebergRecordToString.apply(record));
+ }
+ result.close();
+ return actual;
+ }
+
+ private String randomFormat() {
+ ThreadLocalRandom random = ThreadLocalRandom.current();
+ int i = random.nextInt(3);
+ String[] formats = new String[] {"orc", "parquet", "avro"};
+ return formats[i];
+ }
+}