lsyldliu commented on code in PR #24844:
URL: https://github.com/apache/flink/pull/24844#discussion_r1615542185
##########
flink-test-utils-parent/flink-table-filesystem-test-utils/src/main/java/org/apache/flink/table/file/testutils/TestFileSystemTableFactory.java:
##########
@@ -84,4 +95,48 @@ public DynamicTableSink createDynamicTableSink(Context
context) {
}
return super.createDynamicTableSink(context);
}
+
+ @Override
+ protected void validate(FactoryUtil.TableFactoryHelper helper) {
+ // Except format options, some formats like parquet and orc can not
list all supported
+ // options.
+ helper.validateExcept(
+ helper.getOptions().get(FactoryUtil.FORMAT) + ".",
PARTITION_FIELDS + ".");
+
+ // validate time zone of watermark
+ validateTimeZone(
+ helper.getOptions()
+
.get(FileSystemConnectorOptions.SINK_PARTITION_COMMIT_WATERMARK_TIME_ZONE));
+ }
+
+ /** Similar logic as for {@link TableConfig}. */
+ private void validateTimeZone(String zone) {
+ boolean isValid;
+ try {
+ // We enforce a zone string that is compatible with both
java.util.TimeZone and
+ // java.time.ZoneId to avoid bugs.
+ // In general, advertising either TZDB ID, GMT+xx:xx, or UTC is
the best we can do.
+ isValid =
java.util.TimeZone.getTimeZone(zone).toZoneId().equals(ZoneId.of(zone));
+ } catch (Exception e) {
+ isValid = false;
+ }
+
+ if (!isValid) {
+ throw new ValidationException(
+ String.format(
+ "Invalid time zone for '%s'. The value should be a
Time Zone Database (TZDB) ID "
+ + "such as 'America/Los_Angeles' to
include daylight saving time. Fixed "
+ + "offsets are supported using 'GMT-03:00'
or 'GMT+03:00'. Or use 'UTC' "
+ + "without time zone and daylight saving
time.",
+
FileSystemConnectorOptions.SINK_PARTITION_COMMIT_WATERMARK_TIME_ZONE
+ .key()));
+ }
+ }
+
+ @Override
+ public Set<ConfigOption<?>> optionalOptions() {
Review Comment:
I think this code snippet is not needed, it doesn't take effect.
##########
flink-test-utils-parent/flink-table-filesystem-test-utils/src/main/java/org/apache/flink/table/file/testutils/TestFileSystemTableFactory.java:
##########
@@ -70,6 +80,7 @@ public DynamicTableSource createDynamicTableSource(Context
context) {
@Override
public DynamicTableSink createDynamicTableSink(Context context) {
+ // remove partition field options
Review Comment:
What is this comment function?
##########
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java:
##########
@@ -356,6 +395,51 @@ private static ResultFetcher
callAlterMaterializedTableRefreshOperation(
}
}
+ private static Map<String, String> getPeriodRefreshPartition(
+ String scheduleTime,
+ ObjectIdentifier materializedTableIdentifier,
+ CatalogMaterializedTable materializedTable,
+ ZoneId localZoneId) {
+ if (scheduleTime == null) {
+ throw new ValidationException(
+ String.format(
+ "Scheduler time not properly set for periodic
refresh of table %s",
Review Comment:
```suggestion
"Scheduler time not properly set for periodic
refresh of materialized table %s",
```
##########
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/message/materializedtable/MaterializedTableIdentifierParameter.java:
##########
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.gateway.rest.message.materializedtable;
+
+import org.apache.flink.runtime.rest.messages.ConversionException;
+import org.apache.flink.runtime.rest.messages.MessagePathParameter;
+
+/** {@link MessagePathParameter} that parses the materialized table
identifier. */
+public class MaterializedTableIdentifierParameter extends
MessagePathParameter<String> {
+
+ public static final String KEY = "identifier";
+
+ protected MaterializedTableIdentifierParameter() {
+ super(KEY);
+ }
+
+ @Override
+ protected String convertFromString(String value) throws
ConversionException {
Review Comment:
We can remove `throws ConversionException` to ignore the checkstyle warning.
##########
flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/MaterializedTableStatementITCase.java:
##########
@@ -909,47 +694,172 @@ void testDropMaterializedTable(@InjectClusterClient
RestClusterClient<?> restClu
.asSerializableString()));
}
- private SessionHandle initializeSession() {
- SessionHandle sessionHandle =
service.openSession(defaultSessionEnvironment);
- String catalogDDL =
- String.format(
- "CREATE CATALOG %s\n"
- + "WITH (\n"
- + " 'type' = 'test-filesystem',\n"
- + " 'path' = '%s',\n"
- + " 'default-database' = '%s'\n"
- + " )",
- fileSystemCatalogName, fileSystemCatalogPath,
TEST_DEFAULT_DATABASE);
- service.configureSession(sessionHandle, catalogDDL, -1);
- service.configureSession(
- sessionHandle, String.format("USE CATALOG %s",
fileSystemCatalogName), -1);
-
- // create source table
- String dataGenSource =
- "CREATE TABLE datagenSource (\n"
- + " order_id BIGINT,\n"
- + " order_number VARCHAR(20),\n"
- + " user_id BIGINT,\n"
- + " shop_id BIGINT,\n"
- + " product_id BIGINT,\n"
- + " status BIGINT,\n"
- + " order_type BIGINT,\n"
- + " order_created_at TIMESTAMP,\n"
- + " payment_amount_cents BIGINT\n"
- + ")\n"
- + "WITH (\n"
- + " 'connector' = 'datagen',\n"
- + " 'rows-per-second' = '10'\n"
- + ")";
- service.configureSession(sessionHandle, dataGenSource, -1);
- return sessionHandle;
+ @Test
+ void testRefreshMaterializedTable() throws Exception {
+ long timeout = Duration.ofSeconds(20).toMillis();
+ long pause = Duration.ofSeconds(2).toMillis();
+
+ List<Row> data = new ArrayList<>();
+ data.add(Row.of(1L, 1L, 1L, "2024-01-01"));
+ data.add(Row.of(2L, 2L, 2L, "2024-01-01"));
+ data.add(Row.of(3L, 3L, 3L, "2024-01-02"));
+ data.add(Row.of(4L, 4L, 4L, "2024-01-02"));
+ data.add(Row.of(5L, 5L, 5L, "2024-01-03"));
+ data.add(Row.of(6L, 6L, 6L, "2024-01-03"));
+ String dataId = TestValuesTableFactory.registerData(data);
+
+
createAndVerifyCreateMaterializedTableWithData("my_materialized_table", dataId,
data);
+
+ // remove element of partition '2024-01-02'
+ removePartitionValue(data, "2024-01-02");
+
+ // refresh the materialized table with static partition
+ long startTime = System.currentTimeMillis();
+ Map<String, String> staticPartitions = new HashMap<>();
+ staticPartitions.put("ds", "2024-01-02");
+ ObjectIdentifier objectIdentifier =
+ ObjectIdentifier.of(
+ fileSystemCatalogName, TEST_DEFAULT_DATABASE,
"my_materialized_table");
+ OperationHandle refreshTableHandle =
+ service.refreshMaterializedTable(
+ sessionHandle,
+ objectIdentifier.asSerializableString(),
+ false,
+ null,
+ Collections.emptyMap(),
+ staticPartitions,
+ Collections.emptyMap());
+
+ awaitOperationTermination(service, sessionHandle, refreshTableHandle);
+ List<RowData> result = fetchAllResults(service, sessionHandle,
refreshTableHandle);
+ assertThat(result.size()).isEqualTo(1);
+ String jobId = result.get(0).getString(0).toString();
+
+ // 1. verify fresh job created
+ verifyRefreshJobCreated(restClusterClient, jobId, startTime);
+
+ // 2. verify the new job overwrite the data
+ CommonTestUtils.waitUtil(
+ () ->
+ fetchTableData(sessionHandle, "SELECT * FROM
my_materialized_table").size()
Review Comment:
Due to you having specified the static partitions, so I think the
materialized table only has one record in partition `2024-01-02`, why does it
have five records?
##########
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/handler/materializedtable/RefreshMaterializedTableHandler.java:
##########
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.gateway.rest.handler.materializedtable;
+
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.table.gateway.api.SqlGatewayService;
+import org.apache.flink.table.gateway.api.operation.OperationHandle;
+import org.apache.flink.table.gateway.api.session.SessionHandle;
+import
org.apache.flink.table.gateway.rest.handler.AbstractSqlGatewayRestHandler;
+import
org.apache.flink.table.gateway.rest.message.materializedtable.MaterializedTableIdentifierParameter;
+import
org.apache.flink.table.gateway.rest.message.materializedtable.RefreshMaterializedTableParameters;
+import
org.apache.flink.table.gateway.rest.message.materializedtable.RefreshMaterializedTableRequestBody;
+import
org.apache.flink.table.gateway.rest.message.materializedtable.RefreshMaterializedTableResponseBody;
+import
org.apache.flink.table.gateway.rest.message.session.SessionHandleIdPathParameter;
+import org.apache.flink.table.gateway.rest.util.SqlGatewayRestAPIVersion;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+
+/** Handler to execute materialized table refresh operation. */
+public class RefreshMaterializedTableHandler
+ extends AbstractSqlGatewayRestHandler<
+ RefreshMaterializedTableRequestBody,
+ RefreshMaterializedTableResponseBody,
+ RefreshMaterializedTableParameters> {
+
+ public RefreshMaterializedTableHandler(
+ SqlGatewayService service,
+ Map<String, String> responseHeaders,
+ MessageHeaders<
+ RefreshMaterializedTableRequestBody,
+ RefreshMaterializedTableResponseBody,
+ RefreshMaterializedTableParameters>
+ messageHeaders) {
+ super(service, responseHeaders, messageHeaders);
+ }
+
+ @Override
+ protected CompletableFuture<RefreshMaterializedTableResponseBody>
handleRequest(
+ @Nullable SqlGatewayRestAPIVersion version,
+ @Nonnull HandlerRequest<RefreshMaterializedTableRequestBody>
request)
+ throws RestHandlerException {
+ SessionHandle sessionHandle =
request.getPathParameter(SessionHandleIdPathParameter.class);
+ String materializedTableIdentifier =
+
request.getPathParameter(MaterializedTableIdentifierParameter.class);
+ boolean isPeriodic = request.getRequestBody().isPeriodic();
+ String scheduleTime = request.getRequestBody().getScheduleTime();
+ Map<String, String> dynamicOptions =
request.getRequestBody().getDynamicOptions();
+ Map<String, String> staticPartitions =
request.getRequestBody().getStaticPartitions();
+ Map<String, String> executionConfig =
request.getRequestBody().getExecutionConfig();
+ OperationHandle operationHandle =
+ service.refreshMaterializedTable(
Review Comment:
add a try-catch to this code block?
##########
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java:
##########
@@ -400,12 +484,17 @@ private static void validatePartitionSpec(
@VisibleForTesting
protected static String getManuallyRefreshStatement(
- String tableIdentifier, String query, Map<String, String>
partitionSpec) {
+ ObjectIdentifier tableIdentifier,
+ String query,
Review Comment:
query -> definitionQuery
##########
flink-test-utils-parent/flink-table-filesystem-test-utils/src/main/java/org/apache/flink/table/file/testutils/TestFileSystemTableFactory.java:
##########
@@ -84,4 +95,48 @@ public DynamicTableSink createDynamicTableSink(Context
context) {
}
return super.createDynamicTableSink(context);
}
+
+ @Override
+ protected void validate(FactoryUtil.TableFactoryHelper helper) {
+ // Except format options, some formats like parquet and orc can not
list all supported
+ // options.
+ helper.validateExcept(
+ helper.getOptions().get(FactoryUtil.FORMAT) + ".",
PARTITION_FIELDS + ".");
+
+ // validate time zone of watermark
+ validateTimeZone(
+ helper.getOptions()
+
.get(FileSystemConnectorOptions.SINK_PARTITION_COMMIT_WATERMARK_TIME_ZONE));
+ }
+
+ /** Similar logic as for {@link TableConfig}. */
+ private void validateTimeZone(String zone) {
Review Comment:
I think we can change the visibility of
FileSystemTableFactory#validateTimeZone from private to protected, then we can
reuse it directly.
##########
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java:
##########
@@ -356,6 +395,51 @@ private static ResultFetcher
callAlterMaterializedTableRefreshOperation(
}
}
+ private static Map<String, String> getPeriodRefreshPartition(
+ String scheduleTime,
+ ObjectIdentifier materializedTableIdentifier,
+ CatalogMaterializedTable materializedTable,
+ ZoneId localZoneId) {
+ if (scheduleTime == null) {
+ throw new ValidationException(
+ String.format(
+ "Scheduler time not properly set for periodic
refresh of table %s",
+ materializedTableIdentifier));
+ }
+
+ Map<String, String> options = materializedTable.getOptions();
+ Set<String> partitionFields =
+ options.keySet().stream()
+ .filter(k -> k.startsWith(PARTITION_FIELDS))
+ .collect(Collectors.toSet());
+ Map<String, String> refreshPartitions = new HashMap<>();
+ for (String partKey : partitionFields) {
+ String partField =
+ partKey.substring(
+ PARTITION_FIELDS.length() + 1,
+ partKey.length() - (DATE_FORMATTER.length() + 1));
+ String partFieldFormatter = options.get(partKey);
+ String partFiledValue =
+ formatTimestampString(
+ scheduleTime,
+ SCHEDULE_TIME_DATE_FORMATTER_DEFAULT,
+ partFieldFormatter,
+ TimeZone.getTimeZone(localZoneId));
+ if (partFiledValue == null) {
+ throw new SqlExecutionException(
+ String.format(
+ "Failed to parse a valid partition value for
the field '%s' in table %s using the scheduler time '%s' based on the date
format '%s'.",
Review Comment:
```suggestion
"Failed to parse a valid partition value for
the field '%s' in materialized table %s using the scheduler time '%s' based on
the date format '%s'.",
```
##########
flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/rest/SqlGatewayRestEndpointMaterializedTableITCase.java:
##########
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.gateway.rest;
+
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.gateway.AbstractMaterializedTableStatementITCase;
+import org.apache.flink.table.gateway.api.operation.OperationHandle;
+import
org.apache.flink.table.gateway.rest.handler.AbstractSqlGatewayRestHandler;
+import
org.apache.flink.table.gateway.rest.header.materializedtable.RefreshMaterializedTableHeaders;
+import
org.apache.flink.table.gateway.rest.message.materializedtable.RefreshMaterializedTableParameters;
+import
org.apache.flink.table.gateway.rest.message.materializedtable.RefreshMaterializedTableRequestBody;
+import
org.apache.flink.table.gateway.rest.message.materializedtable.RefreshMaterializedTableResponseBody;
+import
org.apache.flink.table.gateway.rest.util.SqlGatewayRestEndpointExtension;
+import org.apache.flink.table.gateway.rest.util.TestingRestClient;
+import org.apache.flink.table.planner.factories.TestValuesTableFactory;
+import org.apache.flink.types.Row;
+
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Order;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+import static
org.apache.flink.table.gateway.rest.util.TestingRestClient.getTestingRestClient;
+import static
org.apache.flink.table.gateway.service.utils.SqlGatewayServiceTestUtil.fetchAllResults;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * Test basic logic of handlers inherited from {@link
AbstractSqlGatewayRestHandler} in materialized
+ * table related cases.
+ */
+public class SqlGatewayRestEndpointMaterializedTableITCase
Review Comment:
Why do you have to add this test class separately, can't you put it in
MaterializedTableStatementITCase?
##########
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/message/materializedtable/MaterializedTableIdentifierParameter.java:
##########
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.gateway.rest.message.materializedtable;
+
+import org.apache.flink.runtime.rest.messages.ConversionException;
+import org.apache.flink.runtime.rest.messages.MessagePathParameter;
+
+/** {@link MessagePathParameter} that parses the materialized table
identifier. */
+public class MaterializedTableIdentifierParameter extends
MessagePathParameter<String> {
Review Comment:
To align with other implementation classes, it would be better to name it
`MaterializedTableIdentifierPathParameter`
##########
flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/MaterializedTableStatementITCase.java:
##########
@@ -909,47 +694,172 @@ void testDropMaterializedTable(@InjectClusterClient
RestClusterClient<?> restClu
.asSerializableString()));
}
- private SessionHandle initializeSession() {
- SessionHandle sessionHandle =
service.openSession(defaultSessionEnvironment);
- String catalogDDL =
- String.format(
- "CREATE CATALOG %s\n"
- + "WITH (\n"
- + " 'type' = 'test-filesystem',\n"
- + " 'path' = '%s',\n"
- + " 'default-database' = '%s'\n"
- + " )",
- fileSystemCatalogName, fileSystemCatalogPath,
TEST_DEFAULT_DATABASE);
- service.configureSession(sessionHandle, catalogDDL, -1);
- service.configureSession(
- sessionHandle, String.format("USE CATALOG %s",
fileSystemCatalogName), -1);
-
- // create source table
- String dataGenSource =
- "CREATE TABLE datagenSource (\n"
- + " order_id BIGINT,\n"
- + " order_number VARCHAR(20),\n"
- + " user_id BIGINT,\n"
- + " shop_id BIGINT,\n"
- + " product_id BIGINT,\n"
- + " status BIGINT,\n"
- + " order_type BIGINT,\n"
- + " order_created_at TIMESTAMP,\n"
- + " payment_amount_cents BIGINT\n"
- + ")\n"
- + "WITH (\n"
- + " 'connector' = 'datagen',\n"
- + " 'rows-per-second' = '10'\n"
- + ")";
- service.configureSession(sessionHandle, dataGenSource, -1);
- return sessionHandle;
+ @Test
+ void testRefreshMaterializedTable() throws Exception {
+ long timeout = Duration.ofSeconds(20).toMillis();
+ long pause = Duration.ofSeconds(2).toMillis();
+
+ List<Row> data = new ArrayList<>();
+ data.add(Row.of(1L, 1L, 1L, "2024-01-01"));
+ data.add(Row.of(2L, 2L, 2L, "2024-01-01"));
+ data.add(Row.of(3L, 3L, 3L, "2024-01-02"));
+ data.add(Row.of(4L, 4L, 4L, "2024-01-02"));
+ data.add(Row.of(5L, 5L, 5L, "2024-01-03"));
+ data.add(Row.of(6L, 6L, 6L, "2024-01-03"));
+ String dataId = TestValuesTableFactory.registerData(data);
+
+
createAndVerifyCreateMaterializedTableWithData("my_materialized_table", dataId,
data);
+
+ // remove element of partition '2024-01-02'
+ removePartitionValue(data, "2024-01-02");
+
+ // refresh the materialized table with static partition
+ long startTime = System.currentTimeMillis();
+ Map<String, String> staticPartitions = new HashMap<>();
+ staticPartitions.put("ds", "2024-01-02");
+ ObjectIdentifier objectIdentifier =
+ ObjectIdentifier.of(
+ fileSystemCatalogName, TEST_DEFAULT_DATABASE,
"my_materialized_table");
+ OperationHandle refreshTableHandle =
+ service.refreshMaterializedTable(
+ sessionHandle,
+ objectIdentifier.asSerializableString(),
+ false,
+ null,
+ Collections.emptyMap(),
+ staticPartitions,
+ Collections.emptyMap());
+
+ awaitOperationTermination(service, sessionHandle, refreshTableHandle);
+ List<RowData> result = fetchAllResults(service, sessionHandle,
refreshTableHandle);
+ assertThat(result.size()).isEqualTo(1);
+ String jobId = result.get(0).getString(0).toString();
+
+ // 1. verify fresh job created
+ verifyRefreshJobCreated(restClusterClient, jobId, startTime);
+
+ // 2. verify the new job overwrite the data
+ CommonTestUtils.waitUtil(
+ () ->
+ fetchTableData(sessionHandle, "SELECT * FROM
my_materialized_table").size()
+ == data.size(),
+ Duration.ofMillis(timeout),
+ Duration.ofMillis(pause),
+ "Failed to verify the data in materialized table.");
+ assertThat(
+ fetchTableData(
+ sessionHandle,
+ "SELECT * FROM my_materialized_table
where ds = '2024-01-02'")
+ .size())
+ .isEqualTo(1);
+
+ // remove element of partition '2024-01-03' and '2024-01-01'
+ // test refresh job only fresh partition '2024-01-03'
+ removePartitionValue(data, "2024-01-01");
+ removePartitionValue(data, "2024-01-03");
+ // refresh the materialized with period schedule
+ startTime = System.currentTimeMillis();
+ OperationHandle periodRefreshTableHandle =
+ service.refreshMaterializedTable(
+ sessionHandle,
+ objectIdentifier.asSerializableString(),
+ true,
+ "2024-01-03 00:00:00",
+ Collections.emptyMap(),
+ Collections.emptyMap(),
+ Collections.emptyMap());
+
+ awaitOperationTermination(service, sessionHandle,
periodRefreshTableHandle);
+ List<RowData> periodRefreshResult =
+ fetchAllResults(service, sessionHandle,
periodRefreshTableHandle);
+ assertThat(periodRefreshResult.size()).isEqualTo(1);
+ String periodJobId =
periodRefreshResult.get(0).getString(0).toString();
+
+ // 1. verify fresh job created
+ verifyRefreshJobCreated(restClusterClient, periodJobId, startTime);
+
+ // 2. verify the new job overwrite the data
+ assertThat(
+ fetchTableData(
+ sessionHandle,
+ "SELECT * FROM my_materialized_table
where ds = '2024-01-03'")
+ .size())
+ .isEqualTo(getPartitionSize(data, "2024-01-03"));
+ // verify the data of partition '2024-01-01' is not changed
+ assertThat(
+ fetchTableData(
+ sessionHandle,
+ "SELECT * FROM my_materialized_table
where ds = '2024-01-01'")
+ .size())
+ .isNotEqualTo(getPartitionSize(data, "2024-01-01"));
+
+ // refresh the materialized table with schedule time not specified
+ OperationHandle invalidRefreshTableHandle1 =
+ service.refreshMaterializedTable(
+ sessionHandle,
+ objectIdentifier.asSerializableString(),
+ true,
+ null,
+ Collections.emptyMap(),
+ Collections.emptyMap(),
+ Collections.emptyMap());
+ assertThatThrownBy(
+ () ->
+ awaitOperationTermination(
+ service, sessionHandle,
invalidRefreshTableHandle1))
+ .rootCause()
+ .isInstanceOf(ValidationException.class)
+ .hasMessage(
+ String.format(
+ "Scheduler time not properly set for periodic
refresh of table %s",
+ ObjectIdentifier.of(
+ fileSystemCatalogName,
+ TEST_DEFAULT_DATABASE,
+ "my_materialized_table")
+ .asSerializableString()));
+
+ // refresh the materialized table with invalid schedule time
+ String invalidTime = "20240103 00:00:00.000";
+ OperationHandle invalidRefreshTableHandle2 =
+ service.refreshMaterializedTable(
+ sessionHandle,
+ objectIdentifier.asSerializableString(),
+ true,
+ invalidTime,
+ Collections.emptyMap(),
+ Collections.emptyMap(),
+ Collections.emptyMap());
+
+ assertThatThrownBy(
+ () ->
+ awaitOperationTermination(
+ service, sessionHandle,
invalidRefreshTableHandle2))
+ .rootCause()
+ .isInstanceOf(SqlExecutionException.class)
+ .hasMessage(
+ String.format(
+ "Failed to parse a valid partition value for
the field 'ds' in table %s using the scheduler time '20240103 00:00:00.000'
based on the date format 'yyyy-MM-dd HH:mm:ss'.",
+ ObjectIdentifier.of(
+ fileSystemCatalogName,
+ TEST_DEFAULT_DATABASE,
+ "my_materialized_table")
+ .asSerializableString()));
}
- private List<RowData> fetchTableData(SessionHandle sessionHandle, String
query) {
- OperationHandle queryHandle =
- service.executeStatement(sessionHandle, query, -1, new
Configuration());
+ private void removePartitionValue(List<Row> data, String partition) {
Review Comment:
This method name looks a little strange, you just remove one record in the
partition, but name it `removePartitionValue`. My core point is that: why do
you need this method? Does this have other way to implement your test purpose?
##########
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java:
##########
@@ -356,6 +395,51 @@ private static ResultFetcher
callAlterMaterializedTableRefreshOperation(
}
}
+ private static Map<String, String> getPeriodRefreshPartition(
Review Comment:
Can we add some UT for this method?
##########
flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/rest/SqlGatewayRestEndpointMaterializedTableITCase.java:
##########
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.gateway.rest;
+
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.gateway.AbstractMaterializedTableStatementITCase;
+import org.apache.flink.table.gateway.api.operation.OperationHandle;
+import
org.apache.flink.table.gateway.rest.handler.AbstractSqlGatewayRestHandler;
+import
org.apache.flink.table.gateway.rest.header.materializedtable.RefreshMaterializedTableHeaders;
+import
org.apache.flink.table.gateway.rest.message.materializedtable.RefreshMaterializedTableParameters;
+import
org.apache.flink.table.gateway.rest.message.materializedtable.RefreshMaterializedTableRequestBody;
+import
org.apache.flink.table.gateway.rest.message.materializedtable.RefreshMaterializedTableResponseBody;
+import
org.apache.flink.table.gateway.rest.util.SqlGatewayRestEndpointExtension;
+import org.apache.flink.table.gateway.rest.util.TestingRestClient;
+import org.apache.flink.table.planner.factories.TestValuesTableFactory;
+import org.apache.flink.types.Row;
+
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Order;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+import static
org.apache.flink.table.gateway.rest.util.TestingRestClient.getTestingRestClient;
+import static
org.apache.flink.table.gateway.service.utils.SqlGatewayServiceTestUtil.fetchAllResults;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * Test basic logic of handlers inherited from {@link
AbstractSqlGatewayRestHandler} in materialized
+ * table related cases.
+ */
+public class SqlGatewayRestEndpointMaterializedTableITCase
+ extends AbstractMaterializedTableStatementITCase {
+
+ private static TestingRestClient restClient;
+
+ @RegisterExtension
+ @Order(4)
+ private static final SqlGatewayRestEndpointExtension
SQL_GATEWAY_REST_ENDPOINT_EXTENSION =
+ new
SqlGatewayRestEndpointExtension(SQL_GATEWAY_SERVICE_EXTENSION::getService);
+
+ @BeforeAll
+ static void setup() throws Exception {
+ restClient = getTestingRestClient();
+ }
+
+ @Test
+ void testRefreshMaterializedTable() throws Exception {
Review Comment:
Rename to testRefreshMaterializedTableWithRestAPI.
BTW, is it possible to test two patterns with two test examples: specifying
partition.fields.ds.date-formatter and not specifying
partition.fields.ds.date-formatter when building a table?
##########
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/SqlGatewayServiceImpl.java:
##########
@@ -299,6 +303,50 @@ public ResolvedCatalogBaseTable<?> getTable(
}
}
+ @Override
+ public OperationHandle refreshMaterializedTable(
+ SessionHandle sessionHandle,
+ String materializedTableIdentifier,
+ boolean isPeriodic,
+ @Nullable String scheduleTime,
+ Map<String, String> dynamicOptions,
+ Map<String, String> staticPartitions,
+ Map<String, String> executionConfig) {
+ try {
+ return getSession(sessionHandle)
Review Comment:
I think the concrete process logic should be completed by
`OperationExectuor`, `SqlGatewayServiceImpl` is just a proxy, so the better
code would be as follows:
```
return getSession(sessionHandle)
.getOperationManager()
.submitOperation(
handle ->
getSession(sessionHandle)
.createExecutor(Configuration.fromMap(executionConfig))
.refreshMaterializedTable(
handle,
materializedTableIdentifier,
isPeriodic,
scheduleTime,
staticPartitions,
dynamicOptions));
```
Then `OperationExecutor` add the following code:
```
public ResultFetcher refreshMaterializedTable(
OperationHandle handle,
String materializedTableIdentifier,
boolean isPeriodic,
@Nullable String scheduleTime,
Map<String, String> staticPartitions,
Map<String, String> dynamicOptions) {
TableEnvironmentInternal tEnv = getTableEnvironment();
UnresolvedIdentifier unresolvedIdentifier =
tEnv.getParser().parseIdentifier(materializedTableIdentifier);
ObjectIdentifier objectIdentifier =
tEnv.getCatalogManager().qualifyIdentifier(unresolvedIdentifier);
return MaterializedTableManager.refreshMaterializedTable(
this,
handle,
objectIdentifier,
staticPartitions,
dynamicOptions,
isPeriodic,
scheduleTime);
}
```
##########
flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/MaterializedTableStatementITCase.java:
##########
@@ -909,47 +694,172 @@ void testDropMaterializedTable(@InjectClusterClient
RestClusterClient<?> restClu
.asSerializableString()));
}
- private SessionHandle initializeSession() {
- SessionHandle sessionHandle =
service.openSession(defaultSessionEnvironment);
- String catalogDDL =
- String.format(
- "CREATE CATALOG %s\n"
- + "WITH (\n"
- + " 'type' = 'test-filesystem',\n"
- + " 'path' = '%s',\n"
- + " 'default-database' = '%s'\n"
- + " )",
- fileSystemCatalogName, fileSystemCatalogPath,
TEST_DEFAULT_DATABASE);
- service.configureSession(sessionHandle, catalogDDL, -1);
- service.configureSession(
- sessionHandle, String.format("USE CATALOG %s",
fileSystemCatalogName), -1);
-
- // create source table
- String dataGenSource =
- "CREATE TABLE datagenSource (\n"
- + " order_id BIGINT,\n"
- + " order_number VARCHAR(20),\n"
- + " user_id BIGINT,\n"
- + " shop_id BIGINT,\n"
- + " product_id BIGINT,\n"
- + " status BIGINT,\n"
- + " order_type BIGINT,\n"
- + " order_created_at TIMESTAMP,\n"
- + " payment_amount_cents BIGINT\n"
- + ")\n"
- + "WITH (\n"
- + " 'connector' = 'datagen',\n"
- + " 'rows-per-second' = '10'\n"
- + ")";
- service.configureSession(sessionHandle, dataGenSource, -1);
- return sessionHandle;
+ @Test
+ void testRefreshMaterializedTable() throws Exception {
Review Comment:
This larger test has tested three different patterns, can we split it into
three tests, so then it can look more clear for each test.
##########
flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/MaterializedTableStatementITCase.java:
##########
@@ -909,47 +694,172 @@ void testDropMaterializedTable(@InjectClusterClient
RestClusterClient<?> restClu
.asSerializableString()));
}
- private SessionHandle initializeSession() {
- SessionHandle sessionHandle =
service.openSession(defaultSessionEnvironment);
- String catalogDDL =
- String.format(
- "CREATE CATALOG %s\n"
- + "WITH (\n"
- + " 'type' = 'test-filesystem',\n"
- + " 'path' = '%s',\n"
- + " 'default-database' = '%s'\n"
- + " )",
- fileSystemCatalogName, fileSystemCatalogPath,
TEST_DEFAULT_DATABASE);
- service.configureSession(sessionHandle, catalogDDL, -1);
- service.configureSession(
- sessionHandle, String.format("USE CATALOG %s",
fileSystemCatalogName), -1);
-
- // create source table
- String dataGenSource =
- "CREATE TABLE datagenSource (\n"
- + " order_id BIGINT,\n"
- + " order_number VARCHAR(20),\n"
- + " user_id BIGINT,\n"
- + " shop_id BIGINT,\n"
- + " product_id BIGINT,\n"
- + " status BIGINT,\n"
- + " order_type BIGINT,\n"
- + " order_created_at TIMESTAMP,\n"
- + " payment_amount_cents BIGINT\n"
- + ")\n"
- + "WITH (\n"
- + " 'connector' = 'datagen',\n"
- + " 'rows-per-second' = '10'\n"
- + ")";
- service.configureSession(sessionHandle, dataGenSource, -1);
- return sessionHandle;
+ @Test
+ void testRefreshMaterializedTable() throws Exception {
+ long timeout = Duration.ofSeconds(20).toMillis();
+ long pause = Duration.ofSeconds(2).toMillis();
+
+ List<Row> data = new ArrayList<>();
+ data.add(Row.of(1L, 1L, 1L, "2024-01-01"));
+ data.add(Row.of(2L, 2L, 2L, "2024-01-01"));
+ data.add(Row.of(3L, 3L, 3L, "2024-01-02"));
+ data.add(Row.of(4L, 4L, 4L, "2024-01-02"));
+ data.add(Row.of(5L, 5L, 5L, "2024-01-03"));
+ data.add(Row.of(6L, 6L, 6L, "2024-01-03"));
+ String dataId = TestValuesTableFactory.registerData(data);
+
+
createAndVerifyCreateMaterializedTableWithData("my_materialized_table", dataId,
data);
+
+ // remove element of partition '2024-01-02'
+ removePartitionValue(data, "2024-01-02");
+
+ // refresh the materialized table with static partition
+ long startTime = System.currentTimeMillis();
+ Map<String, String> staticPartitions = new HashMap<>();
+ staticPartitions.put("ds", "2024-01-02");
+ ObjectIdentifier objectIdentifier =
+ ObjectIdentifier.of(
+ fileSystemCatalogName, TEST_DEFAULT_DATABASE,
"my_materialized_table");
+ OperationHandle refreshTableHandle =
+ service.refreshMaterializedTable(
+ sessionHandle,
+ objectIdentifier.asSerializableString(),
+ false,
+ null,
+ Collections.emptyMap(),
+ staticPartitions,
+ Collections.emptyMap());
+
+ awaitOperationTermination(service, sessionHandle, refreshTableHandle);
+ List<RowData> result = fetchAllResults(service, sessionHandle,
refreshTableHandle);
+ assertThat(result.size()).isEqualTo(1);
+ String jobId = result.get(0).getString(0).toString();
+
+ // 1. verify fresh job created
+ verifyRefreshJobCreated(restClusterClient, jobId, startTime);
+
+ // 2. verify the new job overwrite the data
+ CommonTestUtils.waitUtil(
+ () ->
+ fetchTableData(sessionHandle, "SELECT * FROM
my_materialized_table").size()
+ == data.size(),
+ Duration.ofMillis(timeout),
+ Duration.ofMillis(pause),
+ "Failed to verify the data in materialized table.");
+ assertThat(
+ fetchTableData(
+ sessionHandle,
+ "SELECT * FROM my_materialized_table
where ds = '2024-01-02'")
+ .size())
+ .isEqualTo(1);
+
+ // remove element of partition '2024-01-03' and '2024-01-01'
+ // test refresh job only fresh partition '2024-01-03'
+ removePartitionValue(data, "2024-01-01");
Review Comment:
I think it would be better if we add records for the partition instead of
deleting them.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]