This is an automated email from the ASF dual-hosted git repository.
JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new c24830e3a5 [core] Don't allow full compaction with lookup changelog
producer (#7999)
c24830e3a5 is described below
commit c24830e3a598e1c1ece21a7557face3cfc8155e7
Author: Arnav Balyan <[email protected]>
AuthorDate: Sat Jun 6 14:18:07 2026 +0530
[core] Don't allow full compaction with lookup changelog producer (#7999)
---
.../org/apache/paimon/schema/SchemaValidation.java | 18 +++++++++++++++-
.../apache/paimon/schema/SchemaValidationTest.java | 25 ++++++++++++++++++++++
.../apache/paimon/flink/CatalogTableITCase.java | 2 +-
3 files changed, 43 insertions(+), 2 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
index c600927af0..85eba1614b 100644
--- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
+++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
@@ -63,6 +63,7 @@ import static
org.apache.paimon.CoreOptions.CHANGELOG_PRODUCER;
import static org.apache.paimon.CoreOptions.DEFAULT_AGG_FUNCTION;
import static org.apache.paimon.CoreOptions.FIELDS_PREFIX;
import static org.apache.paimon.CoreOptions.FIELDS_SEPARATOR;
+import static org.apache.paimon.CoreOptions.FULL_COMPACTION_DELTA_COMMITS;
import static org.apache.paimon.CoreOptions.INCREMENTAL_BETWEEN;
import static org.apache.paimon.CoreOptions.INCREMENTAL_BETWEEN_TIMESTAMP;
import static org.apache.paimon.CoreOptions.INCREMENTAL_TO_AUTO_TAG;
@@ -93,7 +94,7 @@ import static
org.apache.paimon.types.VectorType.fieldsInVectorFile;
import static org.apache.paimon.utils.Preconditions.checkArgument;
import static org.apache.paimon.utils.Preconditions.checkState;
-/** Validation utils for {@link TableSchema}. */
+/** Validation utilities for {@link TableSchema}. */
public class SchemaValidation {
public static final List<Class<? extends DataType>>
PRIMARY_KEY_UNSUPPORTED_LOGICAL_TYPES =
@@ -158,6 +159,21 @@ public class SchemaValidation {
ChangelogProducer.LOOKUP));
}
+ if (options.fullCompactionDeltaCommits() != null
+ && changelogProducer == ChangelogProducer.LOOKUP) {
+ throw new UnsupportedOperationException(
+ String.format(
+ "'%s' is incompatible with '%s'='%s'. "
+ + "Use '%s'='%s' to get periodic full
compaction with changelog generation, "
+ + "or remove '%s'.",
+ FULL_COMPACTION_DELTA_COMMITS.key(),
+ CHANGELOG_PRODUCER.key(),
+ ChangelogProducer.LOOKUP,
+ CHANGELOG_PRODUCER.key(),
+ ChangelogProducer.FULL_COMPACTION,
+ FULL_COMPACTION_DELTA_COMMITS.key()));
+ }
+
checkArgument(
options.snapshotNumRetainMin() > 0,
SNAPSHOT_NUM_RETAINED_MIN.key() + " should be at least 1");
diff --git
a/paimon-core/src/test/java/org/apache/paimon/schema/SchemaValidationTest.java
b/paimon-core/src/test/java/org/apache/paimon/schema/SchemaValidationTest.java
index 6bda437692..2b61696956 100644
---
a/paimon-core/src/test/java/org/apache/paimon/schema/SchemaValidationTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/schema/SchemaValidationTest.java
@@ -729,4 +729,29 @@ class SchemaValidationTest {
.hasMessageContaining(
"deletion-vectors.merge-on-read requires
deletion-vectors.enabled to be true");
}
+
+ @Test
+ public void testFullCompactionDeltaCommitsWithLookupChangelogProducer() {
+ Map<String, String> options = new HashMap<>();
+ options.put(CoreOptions.CHANGELOG_PRODUCER.key(), "lookup");
+ options.put(CoreOptions.FULL_COMPACTION_DELTA_COMMITS.key(), "1");
+ assertThatThrownBy(() -> validateTableSchemaExec(options))
+ .isInstanceOf(UnsupportedOperationException.class)
+
.hasMessageContaining(CoreOptions.FULL_COMPACTION_DELTA_COMMITS.key())
+ .hasMessageContaining("lookup")
+ .hasMessageContaining("full-compaction");
+
+ options.put(CoreOptions.CHANGELOG_PRODUCER.key(), "full-compaction");
+ assertThatCode(() ->
validateTableSchemaExec(options)).doesNotThrowAnyException();
+
+ options.clear();
+ options.put(CoreOptions.CHANGELOG_PRODUCER.key(), "lookup");
+ assertThatCode(() ->
validateTableSchemaExec(options)).doesNotThrowAnyException();
+
+ options.clear();
+ options.put(CoreOptions.FULL_COMPACTION_DELTA_COMMITS.key(), "1");
+ assertThatCode(() ->
validateTableSchemaExec(options)).doesNotThrowAnyException();
+ options.put(CoreOptions.CHANGELOG_PRODUCER.key(), "input");
+ assertThatCode(() ->
validateTableSchemaExec(options)).doesNotThrowAnyException();
+ }
}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java
index 7a82007870..ee14caa381 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java
@@ -1171,7 +1171,7 @@ public class CatalogTableITCase extends CatalogITCaseBase
{
sql("CALL sys.create_branch('default.T', 'stream')");
sql("ALTER TABLE T SET ('scan.fallback-branch' = 'stream')");
sql(
- "ALTER TABLE T$branch_stream SET ('primary-key' = 'k,v',
'bucket' = '2','changelog-producer' = 'lookup')");
+ "ALTER TABLE T$branch_stream SET ('primary-key' = 'k,v',
'bucket' = '2','changelog-producer' = 'full-compaction')");
// full compaction will always be performed at the end of batch jobs,
as long as
// full-compaction.delta-commits is set, regardless of its value
sql("show create table T$branch_stream");