xuyangzhong commented on code in PR #24760:
URL: https://github.com/apache/flink/pull/24760#discussion_r1597372269


##########
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java:
##########
@@ -161,14 +176,131 @@ private static void createMaterializedInContinuousMode(
                     "Submit continuous refresh job for materialized table {} 
occur exception.",
                     materializedTableIdentifier,
                     e);
-            throw new TableException(
+            throw new SqlExecutionException(
                     String.format(
                             "Submit continuous refresh job for materialized 
table %s occur exception.",
                             materializedTableIdentifier),
                     e);
         }
     }
 
+    private static ResultFetcher callAlterMaterializedTableRefreshOperation(
+            OperationExecutor operationExecutor,
+            OperationHandle handle,
+            AlterMaterializedTableRefreshOperation 
alterMaterializedTableRefreshOperation) {
+        ObjectIdentifier materializedTableIdentifier =
+                alterMaterializedTableRefreshOperation.getTableIdentifier();
+        ResolvedCatalogBaseTable<?> table = 
operationExecutor.getTable(materializedTableIdentifier);
+        if (MATERIALIZED_TABLE != table.getTableKind()) {
+            throw new ValidationException(
+                    String.format(
+                            "The table '%s' is not a materialized table.",
+                            materializedTableIdentifier));
+        }
+
+        ResolvedCatalogMaterializedTable materializedTable =
+                (ResolvedCatalogMaterializedTable) table;
+
+        Map<String, String> partitionSpec =
+                alterMaterializedTableRefreshOperation.getPartitionSpec();
+
+        validatePartitionSpec(partitionSpec, 
(ResolvedCatalogMaterializedTable) table);
+
+        // Set job name, runtime mode
+        Configuration customConfig = new Configuration();
+        String jobName =
+                String.format(
+                        "Materialized_table_%s_one_time_refresh_job",
+                        materializedTableIdentifier.asSerializableString());
+        customConfig.set(NAME, jobName);
+        customConfig.set(RUNTIME_MODE, BATCH);
+
+        StringBuilder insertStatement =
+                new StringBuilder(
+                        String.format(
+                                "INSERT OVERWRITE %s SELECT * FROM (%s)",
+                                materializedTableIdentifier,
+                                materializedTable.getDefinitionQuery()));
+
+        if (!partitionSpec.isEmpty()) {
+            insertStatement.append(" WHERE ");
+            insertStatement.append(
+                    partitionSpec.entrySet().stream()
+                            .map(
+                                    entry ->
+                                            String.format(
+                                                    "%s = '%s'", 
entry.getKey(), entry.getValue()))
+                            .reduce((s1, s2) -> s1 + " AND " + s2)
+                            .get());
+        }
+
+        try {
+            LOG.debug(
+                    "Begin to manually refreshing the materialization table 
{}, statement: {}",
+                    materializedTableIdentifier,
+                    insertStatement);
+            return operationExecutor.executeStatement(
+                    handle, customConfig, insertStatement.toString());
+        } catch (Exception e) {
+            // log and throw exception
+            LOG.error(
+                    "Manually refreshing the materialization table {} occur 
exception.",
+                    materializedTableIdentifier,
+                    e);
+            throw new SqlExecutionException(
+                    String.format(
+                            "Manually refreshing the materialization table %s 
occur exception.",
+                            materializedTableIdentifier),
+                    e);
+        }
+    }
+
+    private static void validatePartitionSpec(
+            Map<String, String> partitionSpec, 
ResolvedCatalogMaterializedTable table) {
+        ResolvedSchema schema = table.getResolvedSchema();
+        Set<String> allPartitionKeys = new HashSet<>(table.getPartitionKeys());
+
+        Set<String> unknownPartitionKeys = new HashSet<>();
+        Set<String> nonStringPartitionKeys = new HashSet<>();
+
+        for (String partitionKey : partitionSpec.keySet()) {
+            if (!schema.getColumn(partitionKey).isPresent()) {
+                unknownPartitionKeys.add(partitionKey);
+                continue;
+            }
+
+            if (!schema.getColumn(partitionKey)
+                    .get()
+                    .getDataType()
+                    .getLogicalType()
+                    .getTypeRoot()
+                    .getFamilies()
+                    .contains(LogicalTypeFamily.CHARACTER_STRING)) {
+                nonStringPartitionKeys.add(partitionKey);
+            }
+        }
+
+        if (!unknownPartitionKeys.isEmpty()) {
+            throw new ValidationException(
+                    String.format(
+                            "The partition spec contains unknown partition 
keys: [%s]. All known partition keys are: [%s].",
+                            unknownPartitionKeys.stream()
+                                    .collect(Collectors.joining("', '", "'", 
"'")),
+                            allPartitionKeys.stream()
+                                    .collect(Collectors.joining("', '", "'", 
"'"))));
+        }
+
+        if (!nonStringPartitionKeys.isEmpty()) {
+            throw new ValidationException(
+                    String.format(
+                            "Currently, specifying non-char or non-string type 
partition fields"

Review Comment:
   what about `"Currently, manually refreshing materialized table only supports 
specifying char and string type"
                                       + " partition keys. All specific 
partition keys with unsupported types are: [%s]."`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to