This is an automated email from the ASF dual-hosted git repository.

ron pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 1378979f02e [FLINK-35346][table-common] Introduce workflow scheduler 
interface for materialized table
1378979f02e is described below

commit 1378979f02eed55bbf3f91b08ec166d55b2c42a6
Author: Ron <ron9....@gmail.com>
AuthorDate: Thu May 16 19:41:54 2024 +0800

    [FLINK-35346][table-common] Introduce workflow scheduler interface for 
materialized table
    
    [FLINK-35346][table-common] Introduce workflow scheduler interface for 
materialized table
    
    This closes #24767
---
 .../apache/flink/table/factories/FactoryUtil.java  |   9 +-
 .../table/factories/WorkflowSchedulerFactory.java  |  56 +++++++
 .../factories/WorkflowSchedulerFactoryUtil.java    | 156 ++++++++++++++++++
 .../table/workflow/CreateRefreshWorkflow.java      |  29 ++++
 .../table/workflow/DeleteRefreshWorkflow.java      |  48 ++++++
 .../table/workflow/ModifyRefreshWorkflow.java      |  40 +++++
 .../flink/table/workflow/RefreshWorkflow.java      |  34 ++++
 .../flink/table/workflow/WorkflowException.java    |  37 +++++
 .../flink/table/workflow/WorkflowScheduler.java    |  91 +++++++++++
 .../workflow/TestWorkflowSchedulerFactory.java     | 175 +++++++++++++++++++++
 .../workflow/WorkflowSchedulerFactoryUtilTest.java | 107 +++++++++++++
 .../org.apache.flink.table.factories.Factory       |   1 +
 12 files changed, 782 insertions(+), 1 deletion(-)

diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FactoryUtil.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FactoryUtil.java
index d8d6d7e9000..5d66b23c3d8 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FactoryUtil.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FactoryUtil.java
@@ -167,6 +167,13 @@ public final class FactoryUtil {
                                     + "tasks to advance their watermarks 
without the need to wait for "
                                     + "watermarks from this source while it is 
idle.");
 
+    public static final ConfigOption<String> WORKFLOW_SCHEDULER_TYPE =
+            ConfigOptions.key("workflow-scheduler.type")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "Specify the workflow scheduler type that is used 
for materialized table.");
+
     /**
      * Suffix for keys of {@link ConfigOption} in case a connector requires 
multiple formats (e.g.
      * for both key and value).
@@ -903,7 +910,7 @@ public final class FactoryUtil {
         return loadResults;
     }
 
-    private static String stringifyOption(String key, String value) {
+    public static String stringifyOption(String key, String value) {
         if (GlobalConfiguration.isSensitive(key)) {
             value = HIDDEN_CONTENT;
         }
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/WorkflowSchedulerFactory.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/WorkflowSchedulerFactory.java
new file mode 100644
index 00000000000..72e144f7d19
--- /dev/null
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/WorkflowSchedulerFactory.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.factories;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.table.workflow.WorkflowScheduler;
+
+import java.util.Map;
+
+/**
+ * A factory to create a {@link WorkflowScheduler} instance.
+ *
+ * <p>See {@link Factory} for more information about the general design of a 
factory.
+ */
+@PublicEvolving
+public interface WorkflowSchedulerFactory extends Factory {
+
+    /** Create a workflow scheduler instance which interacts with external 
scheduler service. */
+    WorkflowScheduler<?> createWorkflowScheduler(Context context);
+
+    /** Context provided when a workflow scheduler is created. */
+    @PublicEvolving
+    interface Context {
+
+        /** Gives the config option to create {@link WorkflowScheduler}. */
+        ReadableConfig getConfiguration();
+
+        /**
+         * Returns the options with which the workflow scheduler is created. 
All options that are
+         * prefixed with the workflow scheduler identifier are included in the 
map.
+         *
+         * <p>All the keys in the options are pruned with the prefix. For 
example, the option {@code
+         * workflow-scheduler.airflow.endpoint}'s key is {@code endpoint} in 
the map.
+         *
+         * <p>An implementation should perform validation of these options.
+         */
+        Map<String, String> getWorkflowSchedulerOptions();
+    }
+}
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/WorkflowSchedulerFactoryUtil.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/WorkflowSchedulerFactoryUtil.java
new file mode 100644
index 00000000000..593d6b47d6a
--- /dev/null
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/WorkflowSchedulerFactoryUtil.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.factories;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.DelegatingConfiguration;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.workflow.WorkflowScheduler;
+import org.apache.flink.util.StringUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.table.factories.FactoryUtil.PROPERTY_VERSION;
+import static 
org.apache.flink.table.factories.FactoryUtil.WORKFLOW_SCHEDULER_TYPE;
+import static org.apache.flink.table.factories.FactoryUtil.stringifyOption;
+
+/** Utility for working with {@link WorkflowScheduler}. */
+@PublicEvolving
+public class WorkflowSchedulerFactoryUtil {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(WorkflowSchedulerFactoryUtil.class);
+
+    public static final String WORKFLOW_SCHEDULER_PREFIX = 
"workflow-scheduler";
+
+    private WorkflowSchedulerFactoryUtil() {
+        // no instantiation
+    }
+
+    /**
+     * Attempts to discover the appropriate workflow scheduler factory and 
creates the instance of
+     * the scheduler. Return null directly if doesn't specify the workflow 
scheduler in config
+     * because it is optional for materialized table.
+     */
+    public static @Nullable WorkflowScheduler<?> createWorkflowScheduler(
+            Configuration configuration, ClassLoader classLoader) {
+        // Workflow scheduler identifier
+        String identifier = configuration.get(WORKFLOW_SCHEDULER_TYPE);
+        if (StringUtils.isNullOrWhitespaceOnly(identifier)) {
+            LOG.warn(
+                    "Workflow scheduler options do not contain an option key 
'%s' for discovering an workflow scheduler.");
+            return null;
+        }
+
+        try {
+            final WorkflowSchedulerFactory factory =
+                    FactoryUtil.discoverFactory(
+                            classLoader, WorkflowSchedulerFactory.class, 
identifier);
+            return factory.createWorkflowScheduler(
+                    new DefaultWorkflowSchedulerContext(
+                            configuration, 
getWorkflowSchedulerConfig(configuration, identifier)));
+        } catch (Throwable t) {
+            throw new ValidationException(
+                    String.format(
+                            "Error creating workflow scheduler '%s' in option 
space '%s'.",
+                            identifier,
+                            configuration.toMap().entrySet().stream()
+                                    .map(
+                                            optionEntry ->
+                                                    stringifyOption(
+                                                            
optionEntry.getKey(),
+                                                            
optionEntry.getValue()))
+                                    .sorted()
+                                    .collect(Collectors.joining("\n"))),
+                    t);
+        }
+    }
+
+    private static Map<String, String> getWorkflowSchedulerConfig(
+            Configuration flinkConf, String identifier) {
+        return new DelegatingConfiguration(flinkConf, 
getWorkflowSchedulerOptionPrefix(identifier))
+                .toMap();
+    }
+
+    private static String getWorkflowSchedulerOptionPrefix(String identifier) {
+        return String.format("%s.%s.", WORKFLOW_SCHEDULER_PREFIX, identifier);
+    }
+
+    /**
+     * Creates a utility that helps to validate options for a {@link 
WorkflowSchedulerFactory}.
+     *
+     * <p>Note: This utility checks for left-over options in the final step.
+     */
+    public static WorkflowSchedulerFactoryHelper 
createWorkflowSchedulerFactoryHelper(
+            WorkflowSchedulerFactory workflowSchedulerFactory,
+            WorkflowSchedulerFactory.Context context) {
+        return new WorkflowSchedulerFactoryHelper(
+                workflowSchedulerFactory, 
context.getWorkflowSchedulerOptions());
+    }
+
+    /**
+     * Helper utility for validating all options for a {@link 
WorkflowSchedulerFactory}.
+     *
+     * @see #createWorkflowSchedulerFactoryHelper(WorkflowSchedulerFactory,
+     *     WorkflowSchedulerFactory.Context)
+     */
+    @PublicEvolving
+    public static class WorkflowSchedulerFactoryHelper
+            extends FactoryUtil.FactoryHelper<WorkflowSchedulerFactory> {
+
+        public WorkflowSchedulerFactoryHelper(
+                WorkflowSchedulerFactory workflowSchedulerFactory,
+                Map<String, String> configOptions) {
+            super(workflowSchedulerFactory, configOptions, PROPERTY_VERSION);
+        }
+    }
+
+    /** Default implementation of {@link WorkflowSchedulerFactory.Context}. */
+    @Internal
+    public static class DefaultWorkflowSchedulerContext
+            implements WorkflowSchedulerFactory.Context {
+
+        private final ReadableConfig configuration;
+        private final Map<String, String> workflowSchedulerConfig;
+
+        public DefaultWorkflowSchedulerContext(
+                ReadableConfig configuration, Map<String, String> 
workflowSchedulerConfig) {
+            this.configuration = configuration;
+            this.workflowSchedulerConfig = workflowSchedulerConfig;
+        }
+
+        @Override
+        public ReadableConfig getConfiguration() {
+            return configuration;
+        }
+
+        @Override
+        public Map<String, String> getWorkflowSchedulerOptions() {
+            return workflowSchedulerConfig;
+        }
+    }
+}
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/workflow/CreateRefreshWorkflow.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/workflow/CreateRefreshWorkflow.java
new file mode 100644
index 00000000000..0ca0ebc37dc
--- /dev/null
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/workflow/CreateRefreshWorkflow.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.workflow;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.catalog.CatalogMaterializedTable;
+
+/**
+ * {@link CreateRefreshWorkflow} provides the related information to create 
refresh workflow of
+ * {@link CatalogMaterializedTable}.
+ */
+@PublicEvolving
+public interface CreateRefreshWorkflow extends RefreshWorkflow {}
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/workflow/DeleteRefreshWorkflow.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/workflow/DeleteRefreshWorkflow.java
new file mode 100644
index 00000000000..44508b71e7d
--- /dev/null
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/workflow/DeleteRefreshWorkflow.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.workflow;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.catalog.CatalogMaterializedTable;
+import org.apache.flink.table.refresh.RefreshHandler;
+
+/**
+ * {@link DeleteRefreshWorkflow} provides the related information to delete 
refresh workflow of
+ * {@link CatalogMaterializedTable}.
+ *
+ * @param <T> The type of {@link RefreshHandler} used by specific {@link 
WorkflowScheduler} to
+ *     locate the refresh workflow in scheduler service.
+ */
+@PublicEvolving
+public class DeleteRefreshWorkflow<T extends RefreshHandler> implements 
RefreshWorkflow {
+
+    private final T refreshHandler;
+
+    public DeleteRefreshWorkflow(T refreshHandler) {
+        this.refreshHandler = refreshHandler;
+    }
+
+    /**
+     * Return {@link RefreshHandler} from corresponding {@link 
WorkflowScheduler} which provides
+     * meta info to points to the refresh workflow in scheduler service.
+     */
+    public T getRefreshHandler() {
+        return refreshHandler;
+    }
+}
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/workflow/ModifyRefreshWorkflow.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/workflow/ModifyRefreshWorkflow.java
new file mode 100644
index 00000000000..f4e87ad6075
--- /dev/null
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/workflow/ModifyRefreshWorkflow.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.workflow;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.catalog.CatalogMaterializedTable;
+import org.apache.flink.table.refresh.RefreshHandler;
+
+/**
+ * {@link ModifyRefreshWorkflow} provides the related information to modify 
refresh workflow of
+ * {@link CatalogMaterializedTable}.
+ *
+ * @param <T> The type of {@link RefreshHandler} used by specific {@link 
WorkflowScheduler} to
+ *     locate the refresh workflow in scheduler service.
+ */
+@PublicEvolving
+public interface ModifyRefreshWorkflow<T extends RefreshHandler> extends 
RefreshWorkflow {
+
+    /**
+     * Return {@link RefreshHandler} from corresponding {@link 
WorkflowScheduler} which provides
+     * meta info to points to the refresh workflow in scheduler service.
+     */
+    T getRefreshHandler();
+}
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/workflow/RefreshWorkflow.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/workflow/RefreshWorkflow.java
new file mode 100644
index 00000000000..0dd839f4b68
--- /dev/null
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/workflow/RefreshWorkflow.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.workflow;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.catalog.CatalogMaterializedTable;
+
+/**
+ * {@link RefreshWorkflow} is the basic interface that provide the related 
information to operate
+ * the refresh workflow of {@link CatalogMaterializedTable}, the operation of 
refresh workflow
+ * include create, modify, drop, etc.
+ *
+ * @see CreateRefreshWorkflow
+ * @see ModifyRefreshWorkflow
+ * @see DeleteRefreshWorkflow
+ */
+@PublicEvolving
+public interface RefreshWorkflow {}
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/workflow/WorkflowException.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/workflow/WorkflowException.java
new file mode 100644
index 00000000000..5155d5265ec
--- /dev/null
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/workflow/WorkflowException.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.workflow;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+/**
+ * A workflow-related operation exception to materialized table, including 
create, suspend, resume,
+ * drop workflow operation, etc.
+ */
+@PublicEvolving
+public class WorkflowException extends Exception {
+
+    public WorkflowException(String message) {
+        super(message);
+    }
+
+    public WorkflowException(String message, Throwable cause) {
+        super(message, cause);
+    }
+}
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/workflow/WorkflowScheduler.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/workflow/WorkflowScheduler.java
new file mode 100644
index 00000000000..ce630a3241d
--- /dev/null
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/workflow/WorkflowScheduler.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.workflow;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.refresh.RefreshHandler;
+import org.apache.flink.table.refresh.RefreshHandlerSerializer;
+
+/**
+ * This interface is used to interact with specific workflow scheduler 
services that support
+ * creating, modifying, and deleting refreshed workflow of Materialized Table.
+ *
+ * @param <T> The type of {@link RefreshHandler} used by specific {@link 
WorkflowScheduler} to
+ *     locate the refresh workflow in scheduler service.
+ */
+@PublicEvolving
+public interface WorkflowScheduler<T extends RefreshHandler> {
+
+    /**
+     * Open this workflow scheduler instance. Used for any required 
preparation in initialization
+     * phase.
+     *
+     * @throws WorkflowException if initializing workflow scheduler occur 
exception
+     */
+    void open() throws WorkflowException;
+
+    /**
+     * Close this workflow scheduler when it is no longer needed and release 
any resource that it
+     * might be holding.
+     *
+     * @throws WorkflowException if closing the related resources of workflow 
scheduler failed
+     */
+    void close() throws WorkflowException;
+
+    /**
+     * Return a {@link RefreshHandlerSerializer} instance to serialize and 
deserialize {@link
+     * RefreshHandler} created by specific workflow scheduler service.
+     */
+    RefreshHandlerSerializer<T> getRefreshHandlerSerializer();
+
+    /**
+     * Create a refresh workflow in specific scheduler service for the 
materialized table, return a
+     * {@link RefreshHandler} instance which can locate the refresh workflow 
detail information.
+     *
+     * <p>This method supports creating workflow for periodic refresh, as well 
as workflow for a
+     * one-time refresh only.
+     *
+     * @param createRefreshWorkflow The detail info for create refresh 
workflow of materialized
+     *     table.
+     * @return The meta info which points to the refresh workflow in scheduler 
service.
+     * @throws WorkflowException if creating refresh workflow failed
+     */
+    T createRefreshWorkflow(CreateRefreshWorkflow createRefreshWorkflow) 
throws WorkflowException;
+
+    /**
+     * Modify the refresh workflow status in scheduler service. This includes 
suspend, resume,
+     * modify schedule cron operation, and so on.
+     *
+     * @param modifyRefreshWorkflow The detail info for modify refresh 
workflow of materialized
+     *     table.
+     * @throws WorkflowException if modify refresh workflow failed
+     */
+    void modifyRefreshWorkflow(ModifyRefreshWorkflow<T> modifyRefreshWorkflow)
+            throws WorkflowException;
+
+    /**
+     * Delete the refresh workflow in scheduler service.
+     *
+     * @param deleteRefreshWorkflow The detail info for delete refresh 
workflow of materialized
+     *     table.
+     * @throws WorkflowException if delete refresh workflow failed
+     */
+    void deleteRefreshWorkflow(DeleteRefreshWorkflow<T> deleteRefreshWorkflow)
+            throws WorkflowException;
+}
diff --git 
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/factories/workflow/TestWorkflowSchedulerFactory.java
 
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/factories/workflow/TestWorkflowSchedulerFactory.java
new file mode 100644
index 00000000000..ae550947853
--- /dev/null
+++ 
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/factories/workflow/TestWorkflowSchedulerFactory.java
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.factories.workflow;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.table.factories.WorkflowSchedulerFactory;
+import org.apache.flink.table.factories.WorkflowSchedulerFactoryUtil;
+import org.apache.flink.table.refresh.RefreshHandler;
+import org.apache.flink.table.refresh.RefreshHandlerSerializer;
+import org.apache.flink.table.workflow.CreateRefreshWorkflow;
+import org.apache.flink.table.workflow.DeleteRefreshWorkflow;
+import org.apache.flink.table.workflow.ModifyRefreshWorkflow;
+import org.apache.flink.table.workflow.WorkflowException;
+import org.apache.flink.table.workflow.WorkflowScheduler;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Objects;
+import java.util.Set;
+
+/** This class is an implementation of {@link WorkflowSchedulerFactory} for 
testing purposes. */
+public class TestWorkflowSchedulerFactory implements WorkflowSchedulerFactory {
+
+    public static final String IDENTIFIER = "test";
+
+    public static final ConfigOption<String> USERNAME =
+            ConfigOptions.key("user-name").stringType().noDefaultValue();
+    public static final ConfigOption<String> PASSWORD =
+            ConfigOptions.key("password").stringType().noDefaultValue();
+    public static final ConfigOption<String> PROJECT_NAME =
+            ConfigOptions.key("project-name").stringType().noDefaultValue();
+
+    @Override
+    public String factoryIdentifier() {
+        return IDENTIFIER;
+    }
+
+    @Override
+    public Set<ConfigOption<?>> requiredOptions() {
+        final Set<ConfigOption<?>> options = new HashSet<>();
+        options.add(USERNAME);
+        options.add(PASSWORD);
+        return options;
+    }
+
+    @Override
+    public Set<ConfigOption<?>> optionalOptions() {
+        final Set<ConfigOption<?>> options = new HashSet<>();
+        options.add(PROJECT_NAME);
+        return options;
+    }
+
+    @Override
+    public WorkflowScheduler<?> createWorkflowScheduler(Context context) {
+        WorkflowSchedulerFactoryUtil.WorkflowSchedulerFactoryHelper helper =
+                
WorkflowSchedulerFactoryUtil.createWorkflowSchedulerFactoryHelper(this, 
context);
+        helper.validate();
+
+        return new TestWorkflowScheduler(
+                helper.getOptions().get(USERNAME),
+                helper.getOptions().get(PASSWORD),
+                helper.getOptions().get(PROJECT_NAME));
+    }
+
+    /** Test workflow scheduler for discovery testing. */
+    public static class TestWorkflowScheduler implements 
WorkflowScheduler<TestRefreshHandler> {
+
+        private final String userName;
+        private final String password;
+        private final String projectName;
+
+        public TestWorkflowScheduler(String userName, String password, String 
projectName) {
+            this.userName = userName;
+            this.password = password;
+            this.projectName = projectName;
+        }
+
+        @Override
+        public void open() throws WorkflowException {}
+
+        @Override
+        public void close() throws WorkflowException {}
+
+        @Override
+        public RefreshHandlerSerializer<TestRefreshHandler> 
getRefreshHandlerSerializer() {
+            return TestRefreshHandlerSerializer.INSTANCE;
+        }
+
+        @Override
+        public TestRefreshHandler createRefreshWorkflow(CreateRefreshWorkflow 
createRefreshWorkflow)
+                throws WorkflowException {
+            return TestRefreshHandler.INSTANCE;
+        }
+
+        @Override
+        public void modifyRefreshWorkflow(
+                ModifyRefreshWorkflow<TestRefreshHandler> 
modifyRefreshWorkflow)
+                throws WorkflowException {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public void deleteRefreshWorkflow(
+                DeleteRefreshWorkflow<TestRefreshHandler> 
deleteRefreshWorkflow)
+                throws WorkflowException {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) {
+                return true;
+            }
+            if (o == null || getClass() != o.getClass()) {
+                return false;
+            }
+            TestWorkflowScheduler that = (TestWorkflowScheduler) o;
+            return Objects.equals(userName, that.userName)
+                    && Objects.equals(password, that.password)
+                    && Objects.equals(projectName, that.projectName);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(userName, password, projectName);
+        }
+    }
+
+    /** Test refresh handler for discovery testing. */
+    public static class TestRefreshHandler implements RefreshHandler {
+
+        public static final TestRefreshHandler INSTANCE = new 
TestRefreshHandler();
+
+        @Override
+        public String asSummaryString() {
+            return "Test RefreshHandler";
+        }
+    }
+
+    /** Test refresh handler serializer for discovery testing. */
+    public static class TestRefreshHandlerSerializer
+            implements RefreshHandlerSerializer<TestRefreshHandler> {
+
+        public static final TestRefreshHandlerSerializer INSTANCE =
+                new TestRefreshHandlerSerializer();
+
+        @Override
+        public byte[] serialize(TestRefreshHandler refreshHandler) throws 
IOException {
+            return new byte[0];
+        }
+
+        @Override
+        public TestRefreshHandler deserialize(byte[] serializedBytes, 
ClassLoader cl)
+                throws IOException, ClassNotFoundException {
+            return TestRefreshHandler.INSTANCE;
+        }
+    }
+}
diff --git 
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/factories/workflow/WorkflowSchedulerFactoryUtilTest.java
 
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/factories/workflow/WorkflowSchedulerFactoryUtilTest.java
new file mode 100644
index 00000000000..d61ff5f04e2
--- /dev/null
+++ 
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/factories/workflow/WorkflowSchedulerFactoryUtilTest.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.factories.workflow;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.factories.WorkflowSchedulerFactory;
+import org.apache.flink.table.factories.WorkflowSchedulerFactoryUtil;
+import org.apache.flink.table.workflow.WorkflowScheduler;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.flink.core.testutils.FlinkAssertions.anyCauseMatches;
+import static 
org.apache.flink.table.factories.WorkflowSchedulerFactoryUtil.createWorkflowScheduler;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Tests for {@link WorkflowSchedulerFactoryUtil}. */
+public class WorkflowSchedulerFactoryUtilTest {
+
+    @Test
+    void testCreateWorkflowScheduler() {
+        final Map<String, String> options = getDefaultConfig();
+        WorkflowScheduler<?> actual =
+                createWorkflowScheduler(
+                        Configuration.fromMap(options),
+                        Thread.currentThread().getContextClassLoader());
+
+        WorkflowScheduler<?> expected =
+                new 
TestWorkflowSchedulerFactory.TestWorkflowScheduler("user1", "9999", "project1");
+
+        assertThat(actual).isEqualTo(expected);
+    }
+
+    @Test
+    void testCreateWorkflowSchedulerWithoutType() {
+        WorkflowScheduler<?> actual =
+                createWorkflowScheduler(
+                        new Configuration(), 
Thread.currentThread().getContextClassLoader());
+
+        assertThat(actual).isNull();
+    }
+
+    @Test
+    void testCreateWorkflowSchedulerWithUnknownType() {
+        final Map<String, String> options = getDefaultConfig();
+        options.put("workflow-scheduler.type", "unknown");
+
+        validateException(
+                options,
+                String.format(
+                        "Could not find any factory for identifier 'unknown' "
+                                + "that implements '%s' in the classpath.",
+                        WorkflowSchedulerFactory.class.getCanonicalName()));
+    }
+
+    @Test
+    void testCreateWorkflowSchedulerWithMissingOptions() {
+        final Map<String, String> options = getDefaultConfig();
+        options.remove("workflow-scheduler.test.user-name");
+
+        validateException(
+                options,
+                "One or more required options are missing.\n\n"
+                        + "Missing required options are:\n\n"
+                        + "user-name");
+    }
+
+    // 
--------------------------------------------------------------------------------------------
+
+    private void validateException(Map<String, String> options, String 
errorMessage) {
+        assertThatThrownBy(
+                        () ->
+                                createWorkflowScheduler(
+                                        Configuration.fromMap(options),
+                                        
Thread.currentThread().getContextClassLoader()))
+                .satisfies(anyCauseMatches(ValidationException.class, 
errorMessage));
+    }
+
+    private Map<String, String> getDefaultConfig() {
+        Map<String, String> config = new HashMap<>();
+        config.put("workflow-scheduler.type", "test");
+        config.put("workflow-scheduler.test.user-name", "user1");
+        config.put("workflow-scheduler.test.password", "9999");
+        config.put("workflow-scheduler.test.project-name", "project1");
+        return config;
+    }
+}
diff --git 
a/flink-table/flink-table-common/src/test/resources/META-INF/services/org.apache.flink.table.factories.Factory
 
b/flink-table/flink-table-common/src/test/resources/META-INF/services/org.apache.flink.table.factories.Factory
index 22b140e3e7a..cad6079cdc2 100644
--- 
a/flink-table/flink-table-common/src/test/resources/META-INF/services/org.apache.flink.table.factories.Factory
+++ 
b/flink-table/flink-table-common/src/test/resources/META-INF/services/org.apache.flink.table.factories.Factory
@@ -25,3 +25,4 @@ org.apache.flink.table.factories.TestCatalogFactory
 org.apache.flink.table.factories.TestCatalogStoreFactory
 org.apache.flink.table.factories.TestManagedTableFactory
 org.apache.flink.table.factories.module.DummyModuleFactory
+org.apache.flink.table.factories.workflow.TestWorkflowSchedulerFactory


Reply via email to