xuyangzhong commented on code in PR #24760:
URL: https://github.com/apache/flink/pull/24760#discussion_r1597372269
##########
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java:
##########
@@ -161,14 +176,131 @@ private static void createMaterializedInContinuousMode(
"Submit continuous refresh job for materialized table {}
occur exception.",
materializedTableIdentifier,
e);
- throw new TableException(
+ throw new SqlExecutionException(
String.format(
"Submit continuous refresh job for materialized
table %s occur exception.",
materializedTableIdentifier),
e);
}
}
+ private static ResultFetcher callAlterMaterializedTableRefreshOperation(
+ OperationExecutor operationExecutor,
+ OperationHandle handle,
+ AlterMaterializedTableRefreshOperation
alterMaterializedTableRefreshOperation) {
+ ObjectIdentifier materializedTableIdentifier =
+ alterMaterializedTableRefreshOperation.getTableIdentifier();
+ ResolvedCatalogBaseTable<?> table =
operationExecutor.getTable(materializedTableIdentifier);
+ if (MATERIALIZED_TABLE != table.getTableKind()) {
+ throw new ValidationException(
+ String.format(
+ "The table '%s' is not a materialized table.",
+ materializedTableIdentifier));
+ }
+
+ ResolvedCatalogMaterializedTable materializedTable =
+ (ResolvedCatalogMaterializedTable) table;
+
+ Map<String, String> partitionSpec =
+ alterMaterializedTableRefreshOperation.getPartitionSpec();
+
+ validatePartitionSpec(partitionSpec,
(ResolvedCatalogMaterializedTable) table);
+
+ // Set job name, runtime mode
+ Configuration customConfig = new Configuration();
+ String jobName =
+ String.format(
+ "Materialized_table_%s_one_time_refresh_job",
+ materializedTableIdentifier.asSerializableString());
+ customConfig.set(NAME, jobName);
+ customConfig.set(RUNTIME_MODE, BATCH);
+
+ StringBuilder insertStatement =
+ new StringBuilder(
+ String.format(
+ "INSERT OVERWRITE %s SELECT * FROM (%s)",
+ materializedTableIdentifier,
+ materializedTable.getDefinitionQuery()));
+
+ if (!partitionSpec.isEmpty()) {
+ insertStatement.append(" WHERE ");
+ insertStatement.append(
+ partitionSpec.entrySet().stream()
+ .map(
+ entry ->
+ String.format(
+ "%s = '%s'",
entry.getKey(), entry.getValue()))
+ .reduce((s1, s2) -> s1 + " AND " + s2)
+ .get());
+ }
+
+ try {
+ LOG.debug(
+ "Begin to manually refreshing the materialization table
{}, statement: {}",
+ materializedTableIdentifier,
+ insertStatement);
+ return operationExecutor.executeStatement(
+ handle, customConfig, insertStatement.toString());
+ } catch (Exception e) {
+ // log and throw exception
+ LOG.error(
+ "Manually refreshing the materialization table {} occur
exception.",
+ materializedTableIdentifier,
+ e);
+ throw new SqlExecutionException(
+ String.format(
+ "Manually refreshing the materialization table %s
occur exception.",
+ materializedTableIdentifier),
+ e);
+ }
+ }
+
+ private static void validatePartitionSpec(
+ Map<String, String> partitionSpec,
ResolvedCatalogMaterializedTable table) {
+ ResolvedSchema schema = table.getResolvedSchema();
+ Set<String> allPartitionKeys = new HashSet<>(table.getPartitionKeys());
+
+ Set<String> unknownPartitionKeys = new HashSet<>();
+ Set<String> nonStringPartitionKeys = new HashSet<>();
+
+ for (String partitionKey : partitionSpec.keySet()) {
+ if (!schema.getColumn(partitionKey).isPresent()) {
+ unknownPartitionKeys.add(partitionKey);
+ continue;
+ }
+
+ if (!schema.getColumn(partitionKey)
+ .get()
+ .getDataType()
+ .getLogicalType()
+ .getTypeRoot()
+ .getFamilies()
+ .contains(LogicalTypeFamily.CHARACTER_STRING)) {
+ nonStringPartitionKeys.add(partitionKey);
+ }
+ }
+
+ if (!unknownPartitionKeys.isEmpty()) {
+ throw new ValidationException(
+ String.format(
+ "The partition spec contains unknown partition
keys: [%s]. All known partition keys are: [%s].",
+ unknownPartitionKeys.stream()
+ .collect(Collectors.joining("', '", "'",
"'")),
+ allPartitionKeys.stream()
+ .collect(Collectors.joining("', '", "'",
"'"))));
+ }
+
+ if (!nonStringPartitionKeys.isEmpty()) {
+ throw new ValidationException(
+ String.format(
+ "Currently, specifying non-char or non-string type
partition fields"
Review Comment:
what about `"Currently, manually refreshing materialized table only supports
specifying char and string type partition keys. All specific partition keys
with unsupported types are:\n\n [%s]."`?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]