This is an automated email from the ASF dual-hosted git repository.
yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git
The following commit(s) were added to refs/heads/main by this push:
new 32003fcaf [lake] Tolerate Paimon lake table existent if the schema and
properties matches (#1847)
32003fcaf is described below
commit 32003fcaf420b08709691c3a996291357f002795
Author: Liebing <[email protected]>
AuthorDate: Wed Nov 12 21:39:54 2025 +0800
[lake] Tolerate Paimon lake table existent if the schema and properties
matches (#1847)
---
.../apache/fluss/lake/lakestorage/LakeCatalog.java | 7 +
.../lakestorage/TestingLakeCatalogContext.java | 5 +
.../fluss/flink/catalog/FlinkCatalogTest.java | 10 +-
.../fluss/lake/paimon/PaimonLakeCatalog.java | 67 ++++-
.../{ => utils}/FlussDataTypeToPaimonDataType.java | 2 +-
.../fluss/lake/paimon/utils/PaimonConversions.java | 4 +-
.../lake/paimon/LakeEnabledTableCreateITCase.java | 275 +++++++++++++++++++--
.../fluss/lake/paimon/PaimonLakeCatalogTest.java | 12 +-
.../server/coordinator/CoordinatorService.java | 26 +-
.../fluss/server/coordinator/MetadataManager.java | 43 ++--
.../server/coordinator/LakeTableManagerITCase.java | 10 +-
.../lakehouse/TestingPaimonStoragePlugin.java | 5 +-
fluss-test-coverage/pom.xml | 2 +-
13 files changed, 386 insertions(+), 82 deletions(-)
diff --git
a/fluss-common/src/main/java/org/apache/fluss/lake/lakestorage/LakeCatalog.java
b/fluss-common/src/main/java/org/apache/fluss/lake/lakestorage/LakeCatalog.java
index 4048ca007..4cbccb6c1 100644
---
a/fluss-common/src/main/java/org/apache/fluss/lake/lakestorage/LakeCatalog.java
+++
b/fluss-common/src/main/java/org/apache/fluss/lake/lakestorage/LakeCatalog.java
@@ -75,6 +75,13 @@ public interface LakeCatalog extends AutoCloseable {
@PublicEvolving
interface Context {
+ /**
+ * Whether the current operation is creating a fluss table.
+ *
+ * @return true if the current operation is creating a fluss table
+ */
+ boolean isCreatingFlussTable();
+
/** Get the fluss principal currently accessing the catalog. */
FlussPrincipal getFlussPrincipal();
}
diff --git
a/fluss-common/src/test/java/org/apache/fluss/lake/lakestorage/TestingLakeCatalogContext.java
b/fluss-common/src/test/java/org/apache/fluss/lake/lakestorage/TestingLakeCatalogContext.java
index b57ff94cf..7406b13b6 100644
---
a/fluss-common/src/test/java/org/apache/fluss/lake/lakestorage/TestingLakeCatalogContext.java
+++
b/fluss-common/src/test/java/org/apache/fluss/lake/lakestorage/TestingLakeCatalogContext.java
@@ -22,6 +22,11 @@ import org.apache.fluss.security.acl.FlussPrincipal;
/** A testing implementation of {@link LakeCatalog.Context}. */
public class TestingLakeCatalogContext implements LakeCatalog.Context {
+ @Override
+ public boolean isCreatingFlussTable() {
+ return false;
+ }
+
@Override
public FlussPrincipal getFlussPrincipal() {
return null;
diff --git
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogTest.java
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogTest.java
index 84a85dead..abaf8e7e0 100644
---
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogTest.java
+++
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogTest.java
@@ -308,13 +308,9 @@ class FlinkCatalogTest {
assertThat(catalog.tableExists(lakeTablePath)).isTrue();
// drop fluss table
catalog.dropTable(lakeTablePath, false);
- // create the table again, should throw exception with ignore if exist
= false
- assertThatThrownBy(() -> catalog.createTable(lakeTablePath, table,
false))
- .isInstanceOf(CatalogException.class)
- .hasMessage(
- String.format(
- "The table %s already exists in %s catalog,
please first drop the table in %s catalog or use a new table name.",
- lakeTablePath, "paimon", "paimon"));
+ assertThat(catalog.tableExists(lakeTablePath)).isFalse();
+ // create the table again should be ok, because the existing lake
table is matched
+ catalog.createTable(lakeTablePath, table, false);
}
@Test
diff --git
a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/PaimonLakeCatalog.java
b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/PaimonLakeCatalog.java
index 22a189208..712c2af38 100644
---
a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/PaimonLakeCatalog.java
+++
b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/PaimonLakeCatalog.java
@@ -18,6 +18,7 @@
package org.apache.fluss.lake.paimon;
import org.apache.fluss.annotation.VisibleForTesting;
+import org.apache.fluss.config.ConfigOptions;
import org.apache.fluss.config.Configuration;
import org.apache.fluss.exception.TableAlreadyExistException;
import org.apache.fluss.exception.TableNotExistException;
@@ -27,6 +28,7 @@ import org.apache.fluss.metadata.TableDescriptor;
import org.apache.fluss.metadata.TablePath;
import org.apache.fluss.utils.IOUtils;
+import org.apache.paimon.CoreOptions;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.CatalogContext;
import org.apache.paimon.catalog.CatalogFactory;
@@ -34,12 +36,16 @@ import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.options.Options;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.SchemaChange;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.Table;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypes;
import java.util.LinkedHashMap;
import java.util.List;
+import java.util.Map;
+import static
org.apache.fluss.lake.paimon.utils.PaimonConversions.FLUSS_CONF_PREFIX;
import static org.apache.fluss.lake.paimon.utils.PaimonConversions.toPaimon;
import static
org.apache.fluss.lake.paimon.utils.PaimonConversions.toPaimonSchema;
import static
org.apache.fluss.lake.paimon.utils.PaimonConversions.toPaimonSchemaChanges;
@@ -83,12 +89,12 @@ public class PaimonLakeCatalog implements LakeCatalog {
Identifier paimonPath = toPaimon(tablePath);
Schema paimonSchema = toPaimonSchema(tableDescriptor);
try {
- createTable(paimonPath, paimonSchema);
+ createTable(paimonPath, paimonSchema,
context.isCreatingFlussTable());
} catch (Catalog.DatabaseNotExistException e) {
// create database
createDatabase(tablePath.getDatabaseName());
try {
- createTable(paimonPath, paimonSchema);
+ createTable(paimonPath, paimonSchema,
context.isCreatingFlussTable());
} catch (Catalog.DatabaseNotExistException t) {
// shouldn't happen in normal cases
throw new RuntimeException(
@@ -114,13 +120,31 @@ public class PaimonLakeCatalog implements LakeCatalog {
}
}
- private void createTable(Identifier tablePath, Schema schema)
+ private void createTable(Identifier tablePath, Schema schema, boolean
isCreatingFlussTable)
throws Catalog.DatabaseNotExistException {
try {
// not ignore if table exists
paimonCatalog.createTable(tablePath, schema, false);
} catch (Catalog.TableAlreadyExistException e) {
- throw new TableAlreadyExistException("Table " + tablePath + "
already exists.");
+ try {
+ Table table = paimonCatalog.getTable(tablePath);
+ FileStoreTable fileStoreTable = (FileStoreTable) table;
+ validatePaimonSchemaCapability(
+ tablePath, fileStoreTable.schema().toSchema(), schema);
+ // if creating a new fluss table, we should ensure the lake
table is empty
+ if (isCreatingFlussTable) {
+ checkTableIsEmpty(tablePath, fileStoreTable);
+ }
+ } catch (Catalog.TableNotExistException tableNotExistException) {
+ // shouldn't happen in normal cases
+ throw new RuntimeException(
+ String.format(
+ "Failed to create table %s in Paimon. The
table already existed "
+ + "during the initial creation
attempt, but subsequently "
+ + "could not be found when trying to
get it. "
+ + "Please check whether the Paimon
table was manually deleted, and try again.",
+ tablePath));
+ }
}
}
@@ -142,6 +166,41 @@ public class PaimonLakeCatalog implements LakeCatalog {
}
}
+ private void validatePaimonSchemaCapability(
+ Identifier tablePath, Schema existingSchema, Schema newSchema) {
+ // Adjust options for comparison
+ Map<String, String> existingOptions = existingSchema.options();
+ Map<String, String> newOptions = newSchema.options();
+ // `path` will be set automatically by Paimon, so we need to remove it
in existing options
+ existingOptions.remove(CoreOptions.PATH.key());
+ // when enable datalake with an existing table,
`table.datalake.enabled` will be `false`
+ // in existing options, but `true` in new options.
+ String datalakeConfigKey = FLUSS_CONF_PREFIX +
ConfigOptions.TABLE_DATALAKE_ENABLED.key();
+ if
(Boolean.FALSE.toString().equalsIgnoreCase(existingOptions.get(datalakeConfigKey)))
{
+ existingOptions.remove(datalakeConfigKey);
+ newOptions.remove(datalakeConfigKey);
+ }
+
+ if (!existingSchema.equals(newSchema)) {
+ throw new TableAlreadyExistException(
+ String.format(
+ "The table %s already exists in Paimon catalog,
but the table schema is not compatible. "
+ + "Existing schema: %s, new schema: %s. "
+ + "Please first drop the table in Paimon
catalog or use a new table name.",
+ tablePath.getEscapedFullName(), existingSchema,
newSchema));
+ }
+ }
+
+ private void checkTableIsEmpty(Identifier tablePath, FileStoreTable table)
{
+ if (table.latestSnapshot().isPresent()) {
+ throw new TableAlreadyExistException(
+ String.format(
+ "The table %s already exists in Paimon catalog,
and the table is not empty. "
+ + "Please first drop the table in Paimon
catalog or use a new table name.",
+ tablePath.getEscapedFullName()));
+ }
+ }
+
@Override
public void close() {
IOUtils.closeQuietly(paimonCatalog, "paimon catalog");
diff --git
a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/FlussDataTypeToPaimonDataType.java
b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/utils/FlussDataTypeToPaimonDataType.java
similarity index 99%
rename from
fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/FlussDataTypeToPaimonDataType.java
rename to
fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/utils/FlussDataTypeToPaimonDataType.java
index 204b35119..e97b4f67a 100644
---
a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/FlussDataTypeToPaimonDataType.java
+++
b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/utils/FlussDataTypeToPaimonDataType.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.fluss.lake.paimon;
+package org.apache.fluss.lake.paimon.utils;
import org.apache.fluss.types.ArrayType;
import org.apache.fluss.types.BigIntType;
diff --git
a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/utils/PaimonConversions.java
b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/utils/PaimonConversions.java
index 1f57ba3b6..03f9ae753 100644
---
a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/utils/PaimonConversions.java
+++
b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/utils/PaimonConversions.java
@@ -20,7 +20,6 @@ package org.apache.fluss.lake.paimon.utils;
import org.apache.fluss.annotation.VisibleForTesting;
import org.apache.fluss.exception.InvalidConfigException;
import org.apache.fluss.exception.InvalidTableException;
-import org.apache.fluss.lake.paimon.FlussDataTypeToPaimonDataType;
import org.apache.fluss.lake.paimon.source.FlussRowAsPaimonRow;
import org.apache.fluss.metadata.TableChange;
import org.apache.fluss.metadata.TableDescriptor;
@@ -57,7 +56,7 @@ public class PaimonConversions {
private static final String PARTITION_GENERATE_LEGACY_NAME_OPTION_KEY =
"partition.legacy-name";
// for fluss config
- private static final String FLUSS_CONF_PREFIX = "fluss.";
+ public static final String FLUSS_CONF_PREFIX = "fluss.";
// for paimon config
private static final String PAIMON_CONF_PREFIX = "paimon.";
@@ -67,6 +66,7 @@ public class PaimonConversions {
static {
PAIMON_UNSETTABLE_OPTIONS.add(CoreOptions.BUCKET.key());
PAIMON_UNSETTABLE_OPTIONS.add(CoreOptions.BUCKET_KEY.key());
+ PAIMON_UNSETTABLE_OPTIONS.add(CoreOptions.PATH.key());
PAIMON_UNSETTABLE_OPTIONS.add(PARTITION_GENERATE_LEGACY_NAME_OPTION_KEY);
}
diff --git
a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/LakeEnabledTableCreateITCase.java
b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/LakeEnabledTableCreateITCase.java
index 37033ea74..3e47bdc46 100644
---
a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/LakeEnabledTableCreateITCase.java
+++
b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/LakeEnabledTableCreateITCase.java
@@ -34,12 +34,20 @@ import
org.apache.fluss.server.testutils.FlussClusterExtension;
import org.apache.fluss.types.DataTypes;
import org.apache.paimon.CoreOptions;
+import org.apache.paimon.Snapshot;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.CatalogContext;
import org.apache.paimon.catalog.CatalogFactory;
import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.Timestamp;
import org.apache.paimon.options.Options;
import org.apache.paimon.table.Table;
+import org.apache.paimon.table.sink.BatchTableCommit;
+import org.apache.paimon.table.sink.BatchTableWrite;
+import org.apache.paimon.table.sink.BatchWriteBuilder;
+import org.apache.paimon.table.sink.CommitMessage;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.RowType;
import org.junit.jupiter.api.AfterEach;
@@ -55,6 +63,7 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.stream.Stream;
import static
org.apache.fluss.lake.paimon.utils.PaimonConversions.PAIMON_UNSETTABLE_OPTIONS;
@@ -401,6 +410,110 @@ class LakeEnabledTableCreateITCase {
}
}
+ @Test
+ void testCreateLakeEnableTableWithExistLakeTable() throws Exception {
+ Map<String, String> customProperties = new HashMap<>();
+ customProperties.put("k1", "v1");
+ customProperties.put("paimon.file.format", "parquet");
+
+ // test for existing lake table
+ TableDescriptor td =
+ createTableDescriptor(
+ 2,
+ BUCKET_NUM,
+ Collections.emptyList(),
+ Collections.emptyList(),
+ customProperties,
+ false);
+ TablePath tablePath = TablePath.of(DATABASE,
"log_table_with_exist_lake_table");
+ admin.createTable(tablePath, td, false).get();
+ // drop fluss table, lake table should still exist
+ admin.dropTable(tablePath, false).get();
+ // create the same fluss table again should be ok
+ admin.createTable(tablePath, td, false).get();
+ admin.dropTable(tablePath, false).get();
+
+ // paimon table use dynamic bucket for fluss log table without bucket
keys
+ // so it should be ok to create the same fluss table with a new bucket
num
+ td = td.withBucketCount(BUCKET_NUM + 1);
+ admin.createTable(tablePath, td, false).get();
+ admin.dropTable(tablePath, false).get();
+
+ // create log table with bucket keys will throw exception
+ TableDescriptor logTableWithoutBucketKeys1 =
+ createTableDescriptor(
+ 2,
+ BUCKET_NUM,
+ Arrays.asList("c1", "c2"),
+ Collections.emptyList(),
+ customProperties,
+ false);
+ assertThatThrownBy(
+ () -> admin.createTable(tablePath,
logTableWithoutBucketKeys1, false).get())
+ .cause()
+ .isInstanceOf(LakeTableAlreadyExistException.class)
+ .hasMessage(
+ "The table `fluss`.`log_table_with_exist_lake_table`
already exists in Paimon catalog, but the table schema is not compatible. "
+ + "Existing schema: UpdateSchema{fields=[`c1`
STRING, `c2` INT, `__bucket` INT, `__offset` BIGINT, `__timestamp` TIMESTAMP(6)
WITH LOCAL TIME ZONE], partitionKeys=[], primaryKeys=[], options={bucket=-1,
fluss.table.replication.factor=1, fluss.table.datalake.enabled=true,
fluss.table.datalake.format=paimon, partition.legacy-name=false,
file.format=parquet, fluss.k1=v1}, comment=null}, "
+ + "new schema: UpdateSchema{fields=[`c1`
STRING, `c2` INT, `__bucket` INT, `__offset` BIGINT, `__timestamp` TIMESTAMP(6)
WITH LOCAL TIME ZONE], partitionKeys=[], primaryKeys=[], options={bucket=3,
fluss.table.replication.factor=1, fluss.table.datalake.enabled=true,
fluss.table.datalake.format=paimon, partition.legacy-name=false,
bucket-key=c1,c2, file.format=parquet, fluss.k1=v1}, comment=null}. "
+ + "Please first drop the table in Paimon
catalog or use a new table name.");
+
+ // create log table with different fields will throw exception
+ TableDescriptor logTableWithoutBucketKeys2 =
+ createTableDescriptor(
+ 3,
+ BUCKET_NUM,
+ Collections.emptyList(),
+ Collections.emptyList(),
+ customProperties,
+ false);
+ assertThatThrownBy(
+ () -> admin.createTable(tablePath,
logTableWithoutBucketKeys2, false).get())
+ .cause()
+ .isInstanceOf(LakeTableAlreadyExistException.class)
+ .hasMessage(
+ "The table `fluss`.`log_table_with_exist_lake_table`
already exists in Paimon catalog, but the table schema is not compatible. "
+ + "Existing schema: UpdateSchema{fields=[`c1`
STRING, `c2` INT, `__bucket` INT, `__offset` BIGINT, `__timestamp` TIMESTAMP(6)
WITH LOCAL TIME ZONE], partitionKeys=[], primaryKeys=[], options={bucket=-1,
fluss.table.replication.factor=1, fluss.table.datalake.enabled=true,
fluss.table.datalake.format=paimon, partition.legacy-name=false,
file.format=parquet, fluss.k1=v1}, comment=null}, "
+ + "new schema: UpdateSchema{fields=[`c1`
STRING, `c2` INT, `c3` STRING, `__bucket` INT, `__offset` BIGINT, `__timestamp`
TIMESTAMP(6) WITH LOCAL TIME ZONE], partitionKeys=[], primaryKeys=[],
options={bucket=-1, fluss.table.replication.factor=1,
fluss.table.datalake.enabled=true, fluss.table.datalake.format=paimon,
partition.legacy-name=false, file.format=parquet, fluss.k1=v1}, comment=null}. "
+ + "Please first drop the table in Paimon
catalog or use a new table name.");
+ }
+
+ @Test
+ void testCreateLakeEnableTableWithExistNonEmptyLakeTable() throws
Exception {
+ Map<String, String> customProperties = new HashMap<>();
+ customProperties.put("k1", "v1");
+ customProperties.put("paimon.file.format", "parquet");
+
+ // test for existing lake table with some data
+ TableDescriptor td =
+ TableDescriptor.builder()
+ .schema(
+ Schema.newBuilder()
+ .column("c1", DataTypes.INT())
+ .column("c2", DataTypes.STRING())
+ .build())
+ .property(ConfigOptions.TABLE_DATALAKE_ENABLED, true)
+ .customProperties(customProperties)
+ .build();
+ TablePath tablePath = TablePath.of(DATABASE,
"log_table_with_non_empty_lake_table");
+ admin.createTable(tablePath, td, false).get();
+ // drop fluss table, lake table should still exist
+ admin.dropTable(tablePath, false).get();
+ // create the same fluss table again should be ok
+ admin.createTable(tablePath, td, false).get();
+ admin.dropTable(tablePath, false).get();
+
+ // write some data to the lake table
+ writeData(
+ paimonCatalog.getTable(
+ Identifier.create(tablePath.getDatabaseName(),
tablePath.getTableName())));
+ assertThatThrownBy(() -> admin.createTable(tablePath, td, false).get())
+ .cause()
+ .isInstanceOf(LakeTableAlreadyExistException.class)
+ .hasMessage(
+ "The table
`fluss`.`log_table_with_non_empty_lake_table` already exists in Paimon catalog,
and the table is not empty. Please first drop the table in Paimon catalog or
use a new table name.");
+ }
+
@Test
void testAlterLakeEnabledLogTable() throws Exception {
Map<String, String> customProperties = new HashMap<>();
@@ -435,8 +548,8 @@ class LakeEnabledTableCreateITCase {
admin.alterTable(logTablePath, changes, false).get();
- Table enabledPaimonLogTable =
- paimonCatalog.getTable(Identifier.create(DATABASE,
logTablePath.getTableName()));
+ Identifier paimonTablePath = Identifier.create(DATABASE,
logTablePath.getTableName());
+ Table enabledPaimonLogTable = paimonCatalog.getTable(paimonTablePath);
Map<String, String> updatedProperties = new HashMap<>();
updatedProperties.put(ConfigOptions.TABLE_DATALAKE_ENABLED.key(),
"true");
@@ -470,20 +583,26 @@ class LakeEnabledTableCreateITCase {
changes = Collections.singletonList(disableLake);
admin.alterTable(logTablePath, changes, false).get();
// paimon table should still exist although lake is disabled
- paimonCatalog.getTable(Identifier.create(DATABASE,
logTablePath.getTableName()));
+ paimonCatalog.getTable(paimonTablePath);
// try to enable lake table again
enableLake =
TableChange.set(ConfigOptions.TABLE_DATALAKE_ENABLED.key(), "true");
- List<TableChange> finalChanges = Collections.singletonList(enableLake);
- // TODO: After #846 is implemented, we should remove this exception
assertion.
- assertThatThrownBy(() -> admin.alterTable(logTablePath, finalChanges,
false).get())
- .cause()
- .isInstanceOf(LakeTableAlreadyExistException.class)
- .hasMessage(
- String.format(
- "The table %s already exists in paimon
catalog, please "
- + "first drop the table in paimon
catalog or use a new table name.",
- logTablePath));
+ changes = Collections.singletonList(enableLake);
+ admin.alterTable(logTablePath, changes, false).get();
+
+ // write some data to the lake table
+ writeData(paimonCatalog.getTable(paimonTablePath));
+ Optional<Snapshot> snapshot =
paimonCatalog.getTable(paimonTablePath).latestSnapshot();
+ assertThat(snapshot).isNotEmpty();
+
+ // disable lake table again
+ changes = Collections.singletonList(disableLake);
+ admin.alterTable(logTablePath, changes, false).get();
+
+ // try to enable lake table again, the snapshot should not change
+ changes = Collections.singletonList(enableLake);
+ admin.alterTable(logTablePath, changes, false).get();
+
assertThat(paimonCatalog.getTable(paimonTablePath).latestSnapshot()).isEqualTo(snapshot);
}
@Test
@@ -602,7 +721,7 @@ class LakeEnabledTableCreateITCase {
.cause()
.isInstanceOf(FlussRuntimeException.class)
.hasMessageContaining(
- "Lake table doesn't exists for lake-enabled table "
+ "Lake table doesn't exist for lake-enabled table "
+ tablePath
+ ", which shouldn't be happened. Please check
if the lake table was deleted manually.");
@@ -610,6 +729,84 @@ class LakeEnabledTableCreateITCase {
admin.alterTable(TablePath.of(DATABASE, "not_exist_table"),
tableChanges, true).get();
}
+ @Test
+ void testEnableLakeTableAfterAlterTableProperties() throws Exception {
+ Map<String, String> customProperties = new HashMap<>();
+ customProperties.put("k1", "v1");
+ customProperties.put("paimon.file.format", "parquet");
+
+ // create table
+ TableDescriptor tableDescriptor =
+ TableDescriptor.builder()
+ .schema(
+ Schema.newBuilder()
+ .column("c1", DataTypes.INT())
+ .column("c2", DataTypes.STRING())
+ .build())
+ .property(ConfigOptions.TABLE_DATALAKE_ENABLED, false)
+ .customProperties(customProperties)
+ .distributedBy(BUCKET_NUM, "c1", "c2")
+ .build();
+ TablePath tablePath = TablePath.of(DATABASE,
"enable_lake_table_after_alter_properties");
+ admin.createTable(tablePath, tableDescriptor, false).get();
+ // paimon table should not exist because lake table is disable
+ assertThatThrownBy(
+ () ->
+ paimonCatalog.getTable(
+ Identifier.create(DATABASE,
tablePath.getTableName())))
+ .isInstanceOf(Catalog.TableNotExistException.class)
+ .hasMessage(String.format("Table %s does not exist.",
tablePath));
+
+ // alter fluss table properties
+ List<TableChange> tableChanges =
+ Arrays.asList(TableChange.reset("k1"), TableChange.set("k2",
"v2"));
+ admin.alterTable(tablePath, tableChanges, false).get();
+ // enable lake table should be ok
+ TableChange.SetOption enableLake =
+ TableChange.set(ConfigOptions.TABLE_DATALAKE_ENABLED.key(),
"true");
+ admin.alterTable(tablePath, Collections.singletonList(enableLake),
false).get();
+ Table paimonTable =
+ paimonCatalog.getTable(Identifier.create(DATABASE,
tablePath.getTableName()));
+ customProperties.remove("k1");
+ customProperties.put("k2", "v2");
+ Map<String, String> newProperties = new
HashMap<>(tableDescriptor.getProperties());
+ newProperties.put(ConfigOptions.TABLE_DATALAKE_ENABLED.key(), "true");
+ tableDescriptor = tableDescriptor.withProperties(newProperties,
customProperties);
+ verifyPaimonTable(
+ paimonTable,
+ tableDescriptor,
+ RowType.of(
+ new DataType[] {
+ org.apache.paimon.types.DataTypes.INT(),
+ org.apache.paimon.types.DataTypes.STRING(),
+ // for __bucket, __offset, __timestamp
+ org.apache.paimon.types.DataTypes.INT(),
+ org.apache.paimon.types.DataTypes.BIGINT(),
+
org.apache.paimon.types.DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE()
+ },
+ new String[] {
+ "c1",
+ "c2",
+ BUCKET_COLUMN_NAME,
+ OFFSET_COLUMN_NAME,
+ TIMESTAMP_COLUMN_NAME
+ }),
+ "c1,c2",
+ BUCKET_NUM);
+
+ // disable lake table
+ TableChange.SetOption disableLake =
+ TableChange.set(ConfigOptions.TABLE_DATALAKE_ENABLED.key(),
"false");
+ admin.alterTable(tablePath, Collections.singletonList(disableLake),
false).get();
+
+ // alter fluss table properties when lake table is disabled
+ tableChanges = Collections.singletonList(TableChange.set("k2", "v22"));
+ admin.alterTable(tablePath, tableChanges, false).get();
+
+ // enable lake table again should be ok, even though the table
properties have changed
+ admin.alterTable(tablePath, Collections.singletonList(enableLake),
false).get();
+ }
+
private void verifyPaimonTable(
Table paimonTable,
TableDescriptor flussTable,
@@ -657,4 +854,54 @@ class LakeEnabledTableCreateITCase {
RowType paimonRowType = paimonTable.rowType();
assertThat(paimonRowType).isEqualTo(expectedRowType);
}
+
+ private TableDescriptor createTableDescriptor(
+ int columnNum,
+ int bucketNum,
+ List<String> bucketKeys,
+ List<String> partitionKeys,
+ Map<String, String> customProperties,
+ boolean withPrimaryKeys) {
+ Schema.Builder builder = Schema.newBuilder();
+ for (int i = 1; i <= columnNum; i++) {
+ if (i % 2 == 0) {
+ builder.column("c" + i, DataTypes.INT());
+ } else {
+ builder.column("c" + i, DataTypes.STRING());
+ }
+ }
+ if (withPrimaryKeys) {
+ builder.primaryKey("c1");
+ }
+
+ return TableDescriptor.builder()
+ .schema(builder.build())
+ .property(ConfigOptions.TABLE_DATALAKE_ENABLED, true)
+ .customProperties(customProperties)
+ .distributedBy(bucketNum, bucketKeys)
+ .partitionedBy(partitionKeys)
+ .build();
+ }
+
+ private void writeData(Table table) throws Exception {
+ BatchWriteBuilder writeBuilder = table.newBatchWriteBuilder();
+ try (BatchTableWrite write = writeBuilder.newWrite();
+ BatchTableCommit commit = writeBuilder.newCommit()) {
+
+ for (int i = 0; i < 10; i++) {
+ GenericRow row =
+ GenericRow.of(
+ i,
+ BinaryString.fromString("row-" + i),
+ 0,
+ (long) i,
+
Timestamp.fromEpochMillis(System.currentTimeMillis()));
+ write.write(row);
+ }
+
+ List<CommitMessage> messages = write.prepareCommit();
+ commit.commit(messages);
+ }
+ assertThat(table.latestSnapshot()).isNotEmpty();
+ }
}
diff --git
a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/PaimonLakeCatalogTest.java
b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/PaimonLakeCatalogTest.java
index a9959d821..02b5f1da2 100644
---
a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/PaimonLakeCatalogTest.java
+++
b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/PaimonLakeCatalogTest.java
@@ -32,7 +32,7 @@ import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import java.io.File;
-import java.util.Arrays;
+import java.util.Collections;
import static org.assertj.core.api.Assertions.assertThat;
@@ -51,9 +51,9 @@ class PaimonLakeCatalogTest {
}
@Test
- void testAlterTableConfigs() throws Exception {
- String database = "test_alter_table_configs_db";
- String tableName = "test_alter_table_configs_table";
+ void testAlterTableProperties() throws Exception {
+ String database = "test_alter_table_properties_db";
+ String tableName = "test_alter_table_properties_table";
TablePath tablePath = TablePath.of(database, tableName);
Identifier identifier = Identifier.create(database, tableName);
createTable(database, tableName);
@@ -65,7 +65,7 @@ class PaimonLakeCatalogTest {
// set the value for key
flussPaimonCatalog.alterTable(
tablePath,
- Arrays.asList(TableChange.set("key", "value")),
+ Collections.singletonList(TableChange.set("key", "value")),
new TestingLakeCatalogContext());
table = flussPaimonCatalog.getPaimonCatalog().getTable(identifier);
@@ -75,7 +75,7 @@ class PaimonLakeCatalogTest {
// reset the value for key
flussPaimonCatalog.alterTable(
tablePath,
- Arrays.asList(TableChange.reset("key")),
+ Collections.singletonList(TableChange.reset("key")),
new TestingLakeCatalogContext());
table = flussPaimonCatalog.getPaimonCatalog().getTable(identifier);
diff --git
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java
index d45cc0231..54480d663 100644
---
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java
+++
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java
@@ -283,8 +283,6 @@ public final class CoordinatorService extends
RpcServiceBase implements Coordina
tableAssignment = generateAssignment(bucketCount, replicaFactor,
servers);
}
- // TODO: should tolerate if the lake exist but matches our schema.
This ensures eventually
- // consistent by idempotently creating the table multiple times. See
#846
// before create table in fluss, we may create in lake
if (isDataLakeEnabled(tableDescriptor)) {
try {
@@ -292,15 +290,10 @@ public final class CoordinatorService extends
RpcServiceBase implements Coordina
.createTable(
tablePath,
tableDescriptor,
- new
DefaultLakeCatalogContext(currentSession().getPrincipal()));
+ new DefaultLakeCatalogContext(
+ true,
currentSession().getPrincipal()));
} catch (TableAlreadyExistException e) {
- throw new LakeTableAlreadyExistException(
- String.format(
- "The table %s already exists in %s catalog,
please "
- + "first drop the table in %s catalog
or use a new table name.",
- tablePath,
- lakeCatalogContainer.getDataLakeFormat(),
- lakeCatalogContainer.getDataLakeFormat()));
+ throw new LakeTableAlreadyExistException(e.getMessage(), e);
}
}
@@ -330,9 +323,8 @@ public final class CoordinatorService extends
RpcServiceBase implements Coordina
tablePropertyChanges,
request.isIgnoreIfNotExists(),
lakeCatalogContainer.getLakeCatalog(),
- lakeCatalogContainer.getDataLakeFormat(),
lakeTableTieringManager,
- new
DefaultLakeCatalogContext(currentSession().getPrincipal()));
+ new DefaultLakeCatalogContext(false,
currentSession().getPrincipal()));
return CompletableFuture.completedFuture(new AlterTableResponse());
}
@@ -766,12 +758,20 @@ public final class CoordinatorService extends
RpcServiceBase implements Coordina
static class DefaultLakeCatalogContext implements LakeCatalog.Context {
+ private final boolean isCreatingFlussTable;
private final FlussPrincipal flussPrincipal;
- public DefaultLakeCatalogContext(FlussPrincipal flussPrincipal) {
+ public DefaultLakeCatalogContext(
+ boolean isCreatingFlussTable, FlussPrincipal flussPrincipal) {
+ this.isCreatingFlussTable = isCreatingFlussTable;
this.flussPrincipal = flussPrincipal;
}
+ @Override
+ public boolean isCreatingFlussTable() {
+ return isCreatingFlussTable;
+ }
+
@Override
public FlussPrincipal getFlussPrincipal() {
return flussPrincipal;
diff --git
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/MetadataManager.java
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/MetadataManager.java
index 074cb04f2..ed544a8d1 100644
---
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/MetadataManager.java
+++
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/MetadataManager.java
@@ -35,7 +35,6 @@ import
org.apache.fluss.exception.TableNotPartitionedException;
import org.apache.fluss.exception.TooManyBucketsException;
import org.apache.fluss.exception.TooManyPartitionsException;
import org.apache.fluss.lake.lakestorage.LakeCatalog;
-import org.apache.fluss.metadata.DataLakeFormat;
import org.apache.fluss.metadata.DatabaseDescriptor;
import org.apache.fluss.metadata.DatabaseInfo;
import org.apache.fluss.metadata.ResolvedPartitionSpec;
@@ -327,7 +326,6 @@ public class MetadataManager {
TablePropertyChanges tablePropertyChanges,
boolean ignoreIfNotExists,
@Nullable LakeCatalog lakeCatalog,
- @Nullable DataLakeFormat dataLakeFormat,
LakeTableTieringManager lakeTableTieringManager,
LakeCatalog.Context lakeCatalogContext) {
try {
@@ -360,7 +358,6 @@ public class MetadataManager {
newDescriptor,
tableChanges,
lakeCatalog,
- dataLakeFormat,
lakeCatalogContext);
// update the table to zk
TableRegistration updatedTableRegistration =
@@ -400,7 +397,6 @@ public class MetadataManager {
TableDescriptor newDescriptor,
List<TableChange> tableChanges,
LakeCatalog lakeCatalog,
- DataLakeFormat dataLakeFormat,
LakeCatalog.Context lakeCatalogContext) {
if (isDataLakeEnabled(newDescriptor)) {
if (lakeCatalog == null) {
@@ -410,38 +406,31 @@ public class MetadataManager {
+ " in data lake, because the Fluss cluster
doesn't enable datalake tables.");
}
- boolean isLakeTableNewlyCreated = false;
// to enable lake table
if (!isDataLakeEnabled(tableDescriptor)) {
// before create table in fluss, we may create in lake
try {
lakeCatalog.createTable(tablePath, newDescriptor,
lakeCatalogContext);
- // no need to alter lake table if it is newly created
- isLakeTableNewlyCreated = true;
} catch (TableAlreadyExistException e) {
- // TODO: should tolerate if the lake exist but matches our
schema. This ensures
- // eventually consistent by idempotently creating the
table multiple times. See
- // #846
- throw new LakeTableAlreadyExistException(
- String.format(
- "The table %s already exists in %s
catalog, please "
- + "first drop the table in %s
catalog or use a new table name.",
- tablePath, dataLakeFormat,
dataLakeFormat));
+ throw new LakeTableAlreadyExistException(e.getMessage(),
e);
}
}
+ }
- // only need to alter lake table if it is not newly created
- if (!isLakeTableNewlyCreated) {
- {
- try {
- lakeCatalog.alterTable(tablePath, tableChanges,
lakeCatalogContext);
- } catch (TableNotExistException e) {
- throw new FlussRuntimeException(
- "Lake table doesn't exists for lake-enabled
table "
- + tablePath
- + ", which shouldn't be happened.
Please check if the lake table was deleted manually.",
- e);
- }
+ // We should always alter lake table even though datalake is disabled.
+ // Otherwise, if user alter the fluss table when datalake is disabled,
then enable datalake
+ // again, the lake table will mismatch.
+ if (lakeCatalog != null) {
+ try {
+ lakeCatalog.alterTable(tablePath, tableChanges,
lakeCatalogContext);
+ } catch (TableNotExistException e) {
+ // only throw TableNotExistException if datalake is enabled
+ if (isDataLakeEnabled(newDescriptor)) {
+ throw new FlussRuntimeException(
+ "Lake table doesn't exist for lake-enabled table "
+ + tablePath
+ + ", which shouldn't be happened. Please
check if the lake table was deleted manually.",
+ e);
}
}
}
diff --git
a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/LakeTableManagerITCase.java
b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/LakeTableManagerITCase.java
index 8040bbd93..fb653b7d6 100644
---
a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/LakeTableManagerITCase.java
+++
b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/LakeTableManagerITCase.java
@@ -19,7 +19,7 @@ package org.apache.fluss.server.coordinator;
import org.apache.fluss.config.ConfigOptions;
import org.apache.fluss.config.Configuration;
-import org.apache.fluss.exception.LakeTableAlreadyExistException;
+import org.apache.fluss.exception.TableAlreadyExistException;
import org.apache.fluss.metadata.DataLakeFormat;
import org.apache.fluss.metadata.Schema;
import org.apache.fluss.metadata.TableDescriptor;
@@ -94,7 +94,7 @@ class LakeTableManagerITCase {
adminGateway
.createTable(newCreateTableRequest(lakeTablePath,
lakeTableDescriptor, false))
.get();
- // create again, should throw TableAlreadyExistException thrown by lake
+ // create again, should throw TableAlreadyExistException thrown by
Fluss
assertThatThrownBy(
() ->
adminGateway
@@ -103,9 +103,7 @@ class LakeTableManagerITCase {
lakeTablePath,
lakeTableDescriptor, false))
.get())
.cause()
- .isInstanceOf(LakeTableAlreadyExistException.class)
- .hasMessage(
- "The table %s already exists in paimon catalog, please
first drop the table in paimon catalog or use a new table name.",
- lakeTablePath);
+ .isInstanceOf(TableAlreadyExistException.class)
+ .hasMessage("Table %s already exists.", lakeTablePath);
}
}
diff --git
a/fluss-server/src/test/java/org/apache/fluss/server/lakehouse/TestingPaimonStoragePlugin.java
b/fluss-server/src/test/java/org/apache/fluss/server/lakehouse/TestingPaimonStoragePlugin.java
index c726e22e3..aa030d025 100644
---
a/fluss-server/src/test/java/org/apache/fluss/server/lakehouse/TestingPaimonStoragePlugin.java
+++
b/fluss-server/src/test/java/org/apache/fluss/server/lakehouse/TestingPaimonStoragePlugin.java
@@ -78,7 +78,10 @@ public class TestingPaimonStoragePlugin implements
LakeStoragePlugin {
TablePath tablePath, TableDescriptor tableDescriptor, Context
context)
throws TableAlreadyExistException {
if (tableByPath.containsKey(tablePath)) {
- throw new TableAlreadyExistException("Table " + tablePath + "
already exists");
+ TableDescriptor existingTable = tableByPath.get(tablePath);
+ if (!existingTable.equals(tableDescriptor)) {
+ throw new TableAlreadyExistException("Table " + tablePath
+ " already exists.");
+ }
}
tableByPath.put(tablePath, tableDescriptor);
}
diff --git a/fluss-test-coverage/pom.xml b/fluss-test-coverage/pom.xml
index 6a03e377a..0c6f54cd5 100644
--- a/fluss-test-coverage/pom.xml
+++ b/fluss-test-coverage/pom.xml
@@ -384,7 +384,7 @@
<exclude>org.apache.fluss.flink.DummyClass120</exclude>
<exclude>org.apache.fluss.lake.batch.ArrowRecordBatch</exclude>
<exclude>org.apache.fluss.lake.committer.CommittedLakeSnapshot</exclude>
-
<exclude>org.apache.fluss.lake.paimon.FlussDataTypeToPaimonDataType</exclude>
+
<exclude>org.apache.fluss.lake.paimon.utils.FlussDataTypeToPaimonDataType</exclude>
<!-- start exclude for lake lance -->
<exclude>org.apache.fluss.lake.lance.*</exclude>
<!-- temporarily exclude iceberg -->