luoyuxia commented on code in PR #22821:
URL: https://github.com/apache/flink/pull/22821#discussion_r1246019486


##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/procedures/Procedure.java:
##########
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.procedures;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.annotation.DataTypeHint;
+import org.apache.flink.table.annotation.ProcedureHint;
+import org.apache.flink.table.catalog.Catalog;
+import org.apache.flink.table.catalog.ObjectPath;
+
+/**
+ * Base interface representing a stored procedure that can be executed by 
Flink. An stored procedure
+ * accepts zero, one, or multiple input parameters and then return the 
execution result of the
+ * stored procedure.
+ *
+ * <p>The behavior of {@link Procedure} can be defined by implements a custom 
call method. An call
+ * method must be declared publicly, not static, and named <code>call</code>. 
Call methods can also
+ * be overloaded by implementing multiple methods named <code>call</code>. 
Currently, it doesn't
+ * allows users to custom their own procedure, the customer {@link Procedure} 
can only be provided
+ * by {@link Catalog}. To provide {@link Procedure}, {@link Catalog} must 
implement {@link
+ * Catalog#getProcedure(ObjectPath)}.
+ *
+ * <p>When calling a stored procedure, Flink will always pass the <code>
+ * org.apache.flink.table.procedure.ProcedureContext</code> which provides
+ * StreamExecutionEnvironment currently as the first parameter of the 
<code>call</code> method. So,
+ * the custom <code>call</code> method must accept the <code>
+ * org.apache.flink.table.procedure.ProcedureContext
+ * </code> as the first parameter, and the other parameters of the 
<code>call</code> method are the
+ * actual parameter of the stored procedure.
+ *
+ * <p>By default, input and output data types are automatically extracted 
using reflection. The
+ * input arguments are derived from one or more {@code call()} methods. If the 
reflective
+ * information is not sufficient, it can be supported and enriched with {@link 
DataTypeHint} and
+ * {@link ProcedureHint}. If it's used to hint input arguments, it should only 
hint the input
+ * arguments that start from the second argument since the first argument is 
always <code>
+ * ProcedureContext</code> which doesn't need to be annotated with data type 
hint.
+ *
+ * <p>Note: The return type of the {@code call()} method should always be T[] 
where T can be atomic
+ * type, Row, Pojo. An atomic type will be implicitly wrapped into a row 
consisting of one field.
+ * Also, the {@link DataTypeHint} for output data type is used to hint T.
+ *
+ * <p>The following examples with pseudocode show how to write a stored 
procedure:
+ *
+ * <pre>{@code
+ * // a stored procedure that tries to rewrite data files for iceberg, it 
accept STRING
+ * // and return an array of explicit ROW < STRING, STRING >.
+ * class IcebergRewriteDataFilesProcedure implements Procedure {
+ *   public @DataTypeHint("ROW< rewritten_data_files_count STRING, 
added_data_files_count STRING >")
+ *          Row[] call(ProcedureContext procedureContext, String tableName) {
+ *     // plan for scanning the table to do rewriting
+ *     Table table = loadTable(tableName);
+ *     List<CombinedScanTask> combinedScanTasks = planScanTask(table);
+ *
+ *     // now, rewrite the files according to the planning task
+ *     StreamExecutionEnvironment env = 
procedureContext.getExecutionEnvironment();
+ *     DataStream<CombinedScanTask> dataStream = 
env.fromCollection(combinedScanTasks);
+ *     RowDataRewriter rowDataRewriter =
+ *         new RowDataRewriter(table(), caseSensitive(), fileIO(), 
encryptionManager());
+ *     List<DataFile> addedDataFiles;
+ *     try {
+ *       addedDataFiles = rowDataRewriter.rewriteDataForTasks(dataStream, 
parallelism);
+ *     } catch (Exception e) {
+ *       throw new RuntimeException("Rewrite data file error.", e);
+ *     }
+ *
+ *     // replace the current files
+ *     List<DataFile> currentDataFiles = combinedScanTasks.stream()
+ *             .flatMap(tasks -> 
tasks.files().stream().map(FileScanTask::file))
+ *             .collect(Collectors.toList());
+ *     replaceDataFiles(currentDataFiles, addedDataFiles, startingSnapshotId);
+ *
+ *     // return the result for rewriting
+ *     return new Row[] {Row.of(currentDataFiles.size(), 
addedDataFiles.size())};
+ *   }
+ * }
+ *
+ * // a stored procedure that accepts < STRING, LONG > and
+ * // return an array of STRING without datatype hint.
+ * class RollbackToSnapShotProcedure implements Procedure {
+ *   public String[] call(ProcedureContext procedureContext, String tableName, 
Long snapshot) {
+ *     Table table = loadTable(tableName);
+ *     Long previousSnapShotId = table.currentSnapshot();
+ *     table.manageSnapshots().rollbackTo(snapshotId).commit();
+ *     return new String[] {
+ *             "previous_snapshot_id: " + previousSnapShotId,
+ *             "current_snapshot_id " + snapshot
+ *     };
+ *   }
+ * }
+ * }</pre>
+ *
+ * <p>In the API, a stored procedure can be used as follows:

Review Comment:
   Thanks for advice, I change it to `In term of `.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to