luoyuxia commented on code in PR #1847:
URL: https://github.com/apache/fluss/pull/1847#discussion_r2518010513
##########
fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/LakeEnabledTableCreateITCase.java:
##########
@@ -474,16 +732,8 @@ void testAlterLakeEnabledLogTable() throws Exception {
// try to enable lake table again
Review Comment:
Enhance the test with the most common case:
1: write some data
2: disable datalake
3: enable datalake again, verify data still exist. we can just verify
snapshot for clean code.
##########
fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/PaimonLakeCatalog.java:
##########
@@ -142,6 +167,67 @@ private void alterTable(Identifier tablePath,
List<SchemaChange> tableChanges)
}
}
+ private void validatePaimonSchemaCapability(
+ Identifier tablePath, Schema existingSchema, Schema newSchema) {
+ // Adjust options for comparison
+ Map<String, String> existingOptions = new
HashMap<>(existingSchema.options());
Review Comment:
I'm wondering whether the following code is better:
```
private void validatePaimonSchemaCapability(
Identifier tablePath, Schema existingSchema, Schema newSchema) {
// Adjust options for comparison
Map<String, String> existingOptions = existingSchema.options();
Map<String, String> newOptions =newSchema.options();
// `path` will be set automatically by Paimon, so we need to remove
it in existing options
existingOptions.remove(CoreOptions.PATH.key());
// when enable datalake with an existing table,
`table.datalake.enabled` will be `false`
// in existing options, but `true` in new options.
String datalakeConfigKey = FLUSS_CONF_PREFIX +
ConfigOptions.TABLE_DATALAKE_ENABLED.key();
if
(Boolean.FALSE.toString().equalsIgnoreCase(existingOptions.get(datalakeConfigKey)))
{
existingOptions.remove(datalakeConfigKey);
newOptions.remove(datalakeConfigKey);
}
if (!existingSchema.equals(newSchema)) {
throw new TableAlreadyExistException(
String.format(
"The table %s already exists in Paimon catalog,
but the table schema is not compatible. "
+ "Existing schema: %s, new schema: %s. "
+ "Please first drop the table in Paimon
catalog or use a new table name.",
tablePath.getEscapedFullName(),
existingSchema,
newSchema));
}
}
```
which modify schema's option directly.
I'm afraid that paimon schema introduce another field and we forget set the
field in method `buildSchemaWithOptions`
##########
fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/LakeEnabledTableCreateITCase.java:
##########
@@ -401,6 +408,257 @@ void
testCreateLakeEnableTableWithUnsettablePaimonOptions() {
}
}
+ @Test
+ void testCreateLakeEnableTableWithExistLakeTable() throws Exception {
+ Map<String, String> customProperties = new HashMap<>();
+ customProperties.put("k1", "v1");
+ customProperties.put("paimon.file.format", "parquet");
+
+ // 1. test for existing lake table without bucket keys
+ TableDescriptor logTableWithoutBucketKeys =
+ createTableDescriptor(
+ 2,
+ BUCKET_NUM,
+ Collections.emptyList(),
+ Collections.emptyList(),
+ customProperties,
+ false);
+ TablePath logTablePathWithoutBucketkeys =
+ TablePath.of(DATABASE, "log_table_without_bucket_keys");
+ admin.createTable(logTablePathWithoutBucketkeys,
logTableWithoutBucketKeys, false).get();
+ // drop fluss table, lake table should still exist
+ admin.dropTable(logTablePathWithoutBucketkeys, false).get();
+ // create the same fluss table again should be ok
+ admin.createTable(logTablePathWithoutBucketkeys,
logTableWithoutBucketKeys, false).get();
+ admin.dropTable(logTablePathWithoutBucketkeys, false).get();
+
+ // paimon table use dynamic bucket for fluss log table without bucket
keys
+ // so it should be ok to create the same fluss table with a new bucket
num
+ logTableWithoutBucketKeys =
logTableWithoutBucketKeys.withBucketCount(BUCKET_NUM + 1);
+ admin.createTable(logTablePathWithoutBucketkeys,
logTableWithoutBucketKeys, false).get();
+ admin.dropTable(logTablePathWithoutBucketkeys, false).get();
+
+ // create log table with bucket keys will throw exception
+ TableDescriptor logTableWithoutBucketKeys1 =
+ createTableDescriptor(
+ 2,
+ BUCKET_NUM,
+ Arrays.asList("c1", "c2"),
+ Collections.emptyList(),
+ customProperties,
+ false);
+ assertThatThrownBy(
+ () ->
+ admin.createTable(
+ logTablePathWithoutBucketkeys,
+ logTableWithoutBucketKeys1,
+ false)
+ .get())
+ .cause()
+ .isInstanceOf(LakeTableAlreadyExistException.class)
+ .hasMessage(
+ "The table `fluss`.`log_table_without_bucket_keys`
already exists in Paimon catalog, but the table schema is not compatible. "
+ + "Existing schema: UpdateSchema{fields=[`c1`
STRING, `c2` INT, `__bucket` INT, `__offset` BIGINT, `__timestamp` TIMESTAMP(6)
WITH LOCAL TIME ZONE], partitionKeys=[], primaryKeys=[], options={bucket=-1,
fluss.table.replication.factor=1, fluss.table.datalake.enabled=true,
fluss.table.datalake.format=paimon, partition.legacy-name=false,
file.format=parquet, fluss.k1=v1}, comment=null}, "
+ + "new schema: UpdateSchema{fields=[`c1`
STRING, `c2` INT, `__bucket` INT, `__offset` BIGINT, `__timestamp` TIMESTAMP(6)
WITH LOCAL TIME ZONE], partitionKeys=[], primaryKeys=[], options={bucket=3,
fluss.table.replication.factor=1, fluss.table.datalake.enabled=true,
fluss.table.datalake.format=paimon, partition.legacy-name=false,
bucket-key=c1,c2, file.format=parquet, fluss.k1=v1}, comment=null}. "
+ + "Please first drop the table in Paimon
catalog or use a new table name.");
+
+ // create log table with different fields will throw exception
+ TableDescriptor logTableWithoutBucketKeys2 =
+ createTableDescriptor(
+ 3,
+ BUCKET_NUM,
+ Collections.emptyList(),
+ Collections.emptyList(),
+ customProperties,
+ false);
+ assertThatThrownBy(
+ () ->
+ admin.createTable(
+ logTablePathWithoutBucketkeys,
+ logTableWithoutBucketKeys2,
+ false)
+ .get())
+ .cause()
+ .isInstanceOf(LakeTableAlreadyExistException.class)
+ .hasMessage(
+ "The table `fluss`.`log_table_without_bucket_keys`
already exists in Paimon catalog, but the table schema is not compatible. "
+ + "Existing schema: UpdateSchema{fields=[`c1`
STRING, `c2` INT, `__bucket` INT, `__offset` BIGINT, `__timestamp` TIMESTAMP(6)
WITH LOCAL TIME ZONE], partitionKeys=[], primaryKeys=[], options={bucket=-1,
fluss.table.replication.factor=1, fluss.table.datalake.enabled=true,
fluss.table.datalake.format=paimon, partition.legacy-name=false,
file.format=parquet, fluss.k1=v1}, comment=null}, "
+ + "new schema: UpdateSchema{fields=[`c1`
STRING, `c2` INT, `c3` STRING, `__bucket` INT, `__offset` BIGINT, `__timestamp`
TIMESTAMP(6) WITH LOCAL TIME ZONE], partitionKeys=[], primaryKeys=[],
options={bucket=-1, fluss.table.replication.factor=1,
fluss.table.datalake.enabled=true, fluss.table.datalake.format=paimon,
partition.legacy-name=false, file.format=parquet, fluss.k1=v1}, comment=null}. "
+ + "Please first drop the table in Paimon
catalog or use a new table name.");
+
+ // create table with primary keys will throw exception
Review Comment:
I think verify the basic cases should be fine. Not need to verify all case
in here since the code path should be same.
```
void testCreateLakeEnableTableWithExistLakeTable() throws Exception {
Map<String, String> customProperties = new HashMap<>();
customProperties.put("k1", "v1");
customProperties.put("paimon.file.format", "parquet");
// 1. test for existing lake table without bucket keys
TableDescriptor logTableWithoutBucketKeys =
createTableDescriptor(
2,
BUCKET_NUM,
Collections.emptyList(),
Collections.emptyList(),
customProperties,
false);
TablePath logTablePathWithoutBucketkeys =
TablePath.of(DATABASE, "log_table_without_bucket_keys");
admin.createTable(logTablePathWithoutBucketkeys,
logTableWithoutBucketKeys, false).get();
// drop fluss table, lake table should still exist
admin.dropTable(logTablePathWithoutBucketkeys, false).get();
// create the same fluss table again should be ok
admin.createTable(logTablePathWithoutBucketkeys,
logTableWithoutBucketKeys, false).get();
admin.dropTable(logTablePathWithoutBucketkeys, false).get();
// paimon table use dynamic bucket for fluss log table without
bucket keys
// so it should be ok to create the same fluss table with a new
bucket num
logTableWithoutBucketKeys =
logTableWithoutBucketKeys.withBucketCount(BUCKET_NUM + 1);
admin.createTable(logTablePathWithoutBucketkeys,
logTableWithoutBucketKeys, false).get();
admin.dropTable(logTablePathWithoutBucketkeys, false).get();
// create log table with bucket keys will throw exception
TableDescriptor logTableWithoutBucketKeys1 =
createTableDescriptor(
2,
BUCKET_NUM,
Arrays.asList("c1", "c2"),
Collections.emptyList(),
customProperties,
false);
assertThatThrownBy(
() ->
admin.createTable(
logTablePathWithoutBucketkeys,
logTableWithoutBucketKeys1,
false)
.get())
.cause()
.isInstanceOf(LakeTableAlreadyExistException.class)
.hasMessage(
"The table `fluss`.`log_table_without_bucket_keys`
already exists in Paimon catalog, but the table schema is not compatible. "
+ "Existing schema:
UpdateSchema{fields=[`c1` STRING, `c2` INT, `__bucket` INT, `__offset` BIGINT,
`__timestamp` TIMESTAMP(6) WITH LOCAL TIME ZONE], partitionKeys=[],
primaryKeys=[], options={bucket=-1, fluss.table.replication.factor=1,
fluss.table.datalake.enabled=true, fluss.table.datalake.format=paimon,
partition.legacy-name=false, file.format=parquet, fluss.k1=v1}, comment=null}, "
+ "new schema: UpdateSchema{fields=[`c1`
STRING, `c2` INT, `__bucket` INT, `__offset` BIGINT, `__timestamp` TIMESTAMP(6)
WITH LOCAL TIME ZONE], partitionKeys=[], primaryKeys=[], options={bucket=3,
fluss.table.replication.factor=1, fluss.table.datalake.enabled=true,
fluss.table.datalake.format=paimon, partition.legacy-name=false,
bucket-key=c1,c2, file.format=parquet, fluss.k1=v1}, comment=null}. "
+ "Please first drop the table in Paimon
catalog or use a new table name.");
// create log table with different fields will throw exception
TableDescriptor logTableWithoutBucketKeys2 =
createTableDescriptor(
3,
BUCKET_NUM,
Collections.emptyList(),
Collections.emptyList(),
customProperties,
false);
assertThatThrownBy(
() ->
admin.createTable(
logTablePathWithoutBucketkeys,
logTableWithoutBucketKeys2,
false)
.get())
.cause()
.isInstanceOf(LakeTableAlreadyExistException.class)
.hasMessage(
"The table `fluss`.`log_table_without_bucket_keys`
already exists in Paimon catalog, but the table schema is not compatible. "
+ "Existing schema:
UpdateSchema{fields=[`c1` STRING, `c2` INT, `__bucket` INT, `__offset` BIGINT,
`__timestamp` TIMESTAMP(6) WITH LOCAL TIME ZONE], partitionKeys=[],
primaryKeys=[], options={bucket=-1, fluss.table.replication.factor=1,
fluss.table.datalake.enabled=true, fluss.table.datalake.format=paimon,
partition.legacy-name=false, file.format=parquet, fluss.k1=v1}, comment=null}, "
+ "new schema: UpdateSchema{fields=[`c1`
STRING, `c2` INT, `c3` STRING, `__bucket` INT, `__offset` BIGINT, `__timestamp`
TIMESTAMP(6) WITH LOCAL TIME ZONE], partitionKeys=[], primaryKeys=[],
options={bucket=-1, fluss.table.replication.factor=1,
fluss.table.datalake.enabled=true, fluss.table.datalake.format=paimon,
partition.legacy-name=false, file.format=parquet, fluss.k1=v1}, comment=null}. "
+ "Please first drop the table in Paimon
catalog or use a new table name.");
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]