Copilot commented on code in PR #2365:
URL: https://github.com/apache/fluss/pull/2365#discussion_r2688654738
##########
fluss-server/src/test/java/org/apache/fluss/server/coordinator/LakeTableManagerITCase.java:
##########
@@ -106,4 +117,197 @@ void testCreateAndGetTable() throws Exception {
.isInstanceOf(TableAlreadyExistException.class)
.hasMessage("Table %s already exists.", lakeTablePath);
}
+
+ @Test
+ void testAlterTableDatalakeFreshness() throws Exception {
+ AdminReadOnlyGateway gateway = getAdminOnlyGateway(true);
+ AdminGateway adminGateway = getAdminGateway();
+
+ String db1 = "test_alter_freshness_db";
+ String tb1 = "tb1";
+ TablePath tablePath = TablePath.of(db1, tb1);
+ // first create a database
+ adminGateway.createDatabase(newCreateDatabaseRequest(db1,
false)).get();
+
+ // create a table with datalake enabled and initial freshness
+ Map<String, String> initialProperties = new HashMap<>();
+ initialProperties.put(ConfigOptions.TABLE_DATALAKE_ENABLED.key(),
"true");
+ initialProperties.put(ConfigOptions.TABLE_DATALAKE_FRESHNESS.key(),
"5min");
+ TableDescriptor tableDescriptor =
newPkTable().withProperties(initialProperties);
+ adminGateway.createTable(newCreateTableRequest(tablePath,
tableDescriptor, false)).get();
+
+ // get the table and check initial freshness
+ GetTableInfoResponse response =
+ gateway.getTableInfo(newGetTableInfoRequest(tablePath)).get();
+ TableDescriptor gottenTable =
TableDescriptor.fromJsonBytes(response.getTableJson());
+
assertThat(gottenTable.getProperties().get(ConfigOptions.TABLE_DATALAKE_FRESHNESS.key()))
+ .isEqualTo("5min");
+
+ // alter table to change datalake freshness
+ Map<String, String> setProperties = new HashMap<>();
+ setProperties.put(ConfigOptions.TABLE_DATALAKE_FRESHNESS.key(),
"3min");
+
+ adminGateway
+ .alterTable(
+ newAlterTableRequest(
+ tablePath,
+ alterTableProperties(setProperties, new
ArrayList<>()),
+ Collections.emptyList(),
+ false))
+ .get();
+
+ // get the table and check updated freshness
+ GetTableInfoResponse responseAfterAlter =
+ gateway.getTableInfo(newGetTableInfoRequest(tablePath)).get();
+ TableDescriptor gottenTableAfterAlter =
+
TableDescriptor.fromJsonBytes(responseAfterAlter.getTableJson());
+
+ String freshnessAfterAlter =
+ gottenTableAfterAlter
+ .getProperties()
+ .get(ConfigOptions.TABLE_DATALAKE_FRESHNESS.key());
+ assertThat(freshnessAfterAlter).isEqualTo("3min");
+
+ // cleanup
+ adminGateway.dropTable(newDropTableRequest(db1, tb1, false)).get();
+ adminGateway.dropDatabase(newDropDatabaseRequest(db1, false,
true)).get();
+ }
+
+ @Test
+ void testResetTableDatalakeProperties() throws Exception {
+ AdminReadOnlyGateway gateway = getAdminOnlyGateway(true);
+ AdminGateway adminGateway = getAdminGateway();
+
+ String db1 = "test_reset_datalake_db";
+ String tb1 = "tb1";
+ TablePath tablePath = TablePath.of(db1, tb1);
+ // first create a database
+ adminGateway.createDatabase(newCreateDatabaseRequest(db1,
false)).get();
+
+ // create a table with datalake enabled and custom freshness
+ Map<String, String> initialProperties = new HashMap<>();
+ initialProperties.put(ConfigOptions.TABLE_DATALAKE_ENABLED.key(),
"true");
+ initialProperties.put(ConfigOptions.TABLE_DATALAKE_FRESHNESS.key(),
"3min");
+ TableDescriptor tableDescriptor =
newPkTable().withProperties(initialProperties);
+ adminGateway.createTable(newCreateTableRequest(tablePath,
tableDescriptor, false)).get();
+
+ // get the table and check initial state
+ GetTableInfoResponse response =
+ gateway.getTableInfo(newGetTableInfoRequest(tablePath)).get();
+ TableDescriptor gottenTable =
TableDescriptor.fromJsonBytes(response.getTableJson());
+
assertThat(gottenTable.getProperties().get(ConfigOptions.TABLE_DATALAKE_ENABLED.key()))
+ .isEqualTo("true");
+
assertThat(gottenTable.getProperties().get(ConfigOptions.TABLE_DATALAKE_FRESHNESS.key()))
+ .isEqualTo("3min");
+
+ // reset datalake freshness property
+ List<String> resetProperties = new ArrayList<>();
+ resetProperties.add(ConfigOptions.TABLE_DATALAKE_FRESHNESS.key());
+
+ adminGateway
+ .alterTable(
+ newAlterTableRequest(
+ tablePath,
+ alterTableProperties(new HashMap<>(),
resetProperties),
+ Collections.emptyList(),
+ false))
+ .get();
+
+ // get the table and check freshness is removed
+ GetTableInfoResponse responseAfterReset =
+ gateway.getTableInfo(newGetTableInfoRequest(tablePath)).get();
+ TableDescriptor gottenTableAfterReset =
+
TableDescriptor.fromJsonBytes(responseAfterReset.getTableJson());
+
+ // freshness should be removed from properties
+ assertThat(
+ gottenTableAfterReset
+ .getProperties()
+
.containsKey(ConfigOptions.TABLE_DATALAKE_FRESHNESS.key()))
+ .isFalse();
+ // but datalake.enabled should still be there
+ assertThat(
+ gottenTableAfterReset
+ .getProperties()
+
.get(ConfigOptions.TABLE_DATALAKE_ENABLED.key()))
+ .isEqualTo("true");
+
+ // reset datalake enabled property
+ List<String> resetProperties2 = new ArrayList<>();
+ resetProperties2.add(ConfigOptions.TABLE_DATALAKE_ENABLED.key());
+
+ adminGateway
+ .alterTable(
+ newAlterTableRequest(
+ tablePath,
+ alterTableProperties(new HashMap<>(),
resetProperties2),
+ Collections.emptyList(),
+ false))
+ .get();
+
+ // get the table and check datalake enabled is removed
+ GetTableInfoResponse responseAfterReset2 =
+ gateway.getTableInfo(newGetTableInfoRequest(tablePath)).get();
+ TableDescriptor gottenTableAfterReset2 =
+
TableDescriptor.fromJsonBytes(responseAfterReset2.getTableJson());
+
+ // datalake.enabled should be removed from properties
+ assertThat(
+ gottenTableAfterReset2
+ .getProperties()
+
.containsKey(ConfigOptions.TABLE_DATALAKE_ENABLED.key()))
+ .isFalse();
+
+ // cleanup
+ adminGateway.dropTable(newDropTableRequest(db1, tb1, false)).get();
+ adminGateway.dropDatabase(newDropDatabaseRequest(db1, false,
true)).get();
+ }
+
+ private AdminReadOnlyGateway getAdminOnlyGateway(boolean
isCoordinatorServer) {
+ if (isCoordinatorServer) {
+ return getAdminGateway();
+ } else {
+ return FLUSS_CLUSTER_EXTENSION.newTabletServerClientForNode(0);
+ }
+ }
+
+ private AdminGateway getAdminGateway() {
+ return FLUSS_CLUSTER_EXTENSION.newCoordinatorClient();
+ }
+
+ private static TableDescriptor newPkTable() {
+ return TableDescriptor.builder()
+ .schema(
+ Schema.newBuilder()
+ .column("a", DataTypes.INT())
+ .withComment("a comment")
+ .column("b", DataTypes.STRING())
+ .primaryKey("a")
+ .build())
+ .comment("first table")
+ .distributedBy(3, "a")
+ .build();
+ }
+
+ private static List<PbAlterConfig> alterTableProperties(
+ Map<String, String> setProperties, List<String> resetProperties) {
+ List<PbAlterConfig> res = new ArrayList<>();
+
+ for (Map.Entry<String, String> entry : setProperties.entrySet()) {
+ PbAlterConfig info = new PbAlterConfig();
+ info.setConfigKey(entry.getKey());
+ info.setConfigValue(entry.getValue());
+ info.setOpType(AlterConfigOpType.SET.value());
+ res.add(info);
+ }
+
+ for (String resetProperty : resetProperties) {
+ PbAlterConfig info = new PbAlterConfig();
+ info.setConfigKey(resetProperty);
+ info.setOpType(AlterConfigOpType.DELETE.value());
+ res.add(info);
+ }
+
+ return res;
+ }
Review Comment:
The helper methods `newPkTable()` and `alterTableProperties()` are
duplicated from `TableManagerITCase.java` (lines 815-843). Consider extracting
these common test utilities to a shared test utility class to avoid code
duplication and maintain consistency across test files.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]