This is an automated email from the ASF dual-hosted git repository.

lindong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-ml.git


The following commit(s) were added to refs/heads/master by this push:
     new 76a0486  [FLINK-29602] Add Transformer for SQLTransformer
76a0486 is described below

commit 76a04869c117e15384f586931416ce1ed301bdf0
Author: yunfengzhou-hub <[email protected]>
AuthorDate: Mon Nov 21 15:19:26 2022 +0800

    [FLINK-29602] Add Transformer for SQLTransformer
    
    This closes #175.
---
 .../docs/operators/feature/sqltransformer.md       | 142 ++++++++++++++
 .../ml/examples/feature/SQLTransformerExample.java |  56 ++++++
 .../ml/feature/sqltransformer/SQLTransformer.java  | 193 ++++++++++++++++++
 .../sqltransformer/SQLTransformerParams.java       |  52 +++++
 .../flink/ml/feature/SQLTransformerTest.java       | 215 +++++++++++++++++++++
 .../examples/ml/feature/sqltransformer_example.py  |  49 +++++
 .../pyflink/ml/lib/feature/sqltransformer.py       |  85 ++++++++
 .../ml/lib/feature/tests/test_sqltransformer.py    |  65 +++++++
 8 files changed, 857 insertions(+)

diff --git a/docs/content/docs/operators/feature/sqltransformer.md 
b/docs/content/docs/operators/feature/sqltransformer.md
new file mode 100644
index 0000000..e785da8
--- /dev/null
+++ b/docs/content/docs/operators/feature/sqltransformer.md
@@ -0,0 +1,142 @@
+---
+title: "SQLTransformer"
+weight: 1
+type: docs
+aliases:
+- /operators/feature/sqltransformer.html
+---
+
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+## SQLTransformer
+
+SQLTransformer implements the transformations that are defined by SQL 
statement.
+
+Currently we only support SQL syntax like `SELECT ... FROM __THIS__ ...` where
+`__THIS__` represents the input table and cannot be modified.
+
+The select clause specifies the fields, constants, and expressions to display 
in
+the output. Except the cases described in the note section below, it can be any
+select clause that Flink SQL supports. Users can also use Flink SQL built-in
+function and UDFs to operate on these selected columns.
+
+For example, SQLTransformer supports statements like:
+
+- `SELECT a, a + b AS a_b FROM __THIS__`
+- `SELECT a, SQRT(b) AS b_sqrt FROM __THIS__ where a > 5`
+- `SELECT a, b, SUM(c) AS c_sum FROM __THIS__ GROUP BY a, b`
+
+Note: This operator only generates append-only/insert-only table as its output.
+If the output table could possibly contain retract messages(e.g. perform 
`SELECT
+... FROM __THIS__ GROUP BY ...` operation on a table in streaming mode), this
+operator would aggregate all changelogs and only output the final state.
+
+### Parameters
+
+| Key       | Default | Type   | Required | Description    |
+|:----------|:--------|:-------|:---------|:---------------|
+| statement | `null`  | String | yes      | SQL statement. |
+
+### Examples
+
+{{< tabs examples >}}
+
+{{< tab "Java">}}
+
+```java
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.ml.feature.sqltransformer.SQLTransformer;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.types.Row;
+
+import java.util.Arrays;
+
+/** Simple program that creates a SQLTransformer instance and uses it for 
feature engineering. */
+public class SQLTransformerExample {
+    public static void main(String[] args) {
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
+
+        // Generates input data.
+        DataStream<Row> inputStream =
+                env.fromCollection(
+                        Arrays.asList(Row.of(0, 1.0, 3.0), Row.of(2, 2.0, 
5.0)),
+                        new RowTypeInfo(Types.INT, Types.DOUBLE, 
Types.DOUBLE));
+        Table inputTable = tEnv.fromDataStream(inputStream).as("id", "v1", 
"v2");
+
+        // Creates a SQLTransformer object and initializes its parameters.
+        SQLTransformer sqlTransformer =
+                new SQLTransformer()
+                        .setStatement("SELECT *, (v1 + v2) AS v3, (v1 * v2) AS 
v4 FROM __THIS__");
+
+        // Uses the SQLTransformer object for feature transformations.
+        Table outputTable = sqlTransformer.transform(inputTable)[0];
+
+        // Extracts and displays the results.
+        outputTable.execute().print();
+    }
+}
+```
+
+{{< /tab>}}
+
+{{< tab "Python">}}
+
+```python
+# Simple program that creates a SQLTransformer instance and uses it for feature
+# engineering.
+
+from pyflink.common import Types
+from pyflink.datastream import StreamExecutionEnvironment
+from pyflink.ml.lib.feature.sqltransformer import SQLTransformer
+from pyflink.table import StreamTableEnvironment
+
+env = StreamExecutionEnvironment.get_execution_environment()
+
+t_env = StreamTableEnvironment.create(env)
+
+# Generates input data.
+input_data_table = t_env.from_data_stream(
+    env.from_collection([
+        (0, 1.0, 3.0),
+        (2, 2.0, 5.0),
+    ],
+        type_info=Types.ROW_NAMED(
+            ['id', 'v1', 'v2'],
+            [Types.INT(), Types.DOUBLE(), Types.DOUBLE()])))
+
+# Creates a SQLTransformer object and initializes its parameters.
+sql_transformer = SQLTransformer() \
+    .set_statement('SELECT *, (v1 + v2) AS v3, (v1 * v2) AS v4 FROM __THIS__')
+
+# Uses the SQLTransformer object for feature transformations.
+output_table = sql_transformer.transform(input_data_table)[0]
+
+# Extracts and displays the results.
+output_table.execute().print()
+```
+
+{{< /tab>}}
+
+{{< /tabs>}}
diff --git 
a/flink-ml-examples/src/main/java/org/apache/flink/ml/examples/feature/SQLTransformerExample.java
 
b/flink-ml-examples/src/main/java/org/apache/flink/ml/examples/feature/SQLTransformerExample.java
new file mode 100644
index 0000000..1a64c3b
--- /dev/null
+++ 
b/flink-ml-examples/src/main/java/org/apache/flink/ml/examples/feature/SQLTransformerExample.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.examples.feature;
+
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.ml.feature.sqltransformer.SQLTransformer;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.types.Row;
+
+import java.util.Arrays;
+
+/** Simple program that creates a SQLTransformer instance and uses it for 
feature engineering. */
+public class SQLTransformerExample {
+    public static void main(String[] args) {
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
+
+        // Generates input data.
+        DataStream<Row> inputStream =
+                env.fromCollection(
+                        Arrays.asList(Row.of(0, 1.0, 3.0), Row.of(2, 2.0, 
5.0)),
+                        new RowTypeInfo(Types.INT, Types.DOUBLE, 
Types.DOUBLE));
+        Table inputTable = tEnv.fromDataStream(inputStream).as("id", "v1", 
"v2");
+
+        // Creates a SQLTransformer object and initializes its parameters.
+        SQLTransformer sqlTransformer =
+                new SQLTransformer()
+                        .setStatement("SELECT *, (v1 + v2) AS v3, (v1 * v2) AS 
v4 FROM __THIS__");
+
+        // Uses the SQLTransformer object for feature transformations.
+        Table outputTable = sqlTransformer.transform(inputTable)[0];
+
+        // Extracts and displays the results.
+        outputTable.execute().print();
+    }
+}
diff --git 
a/flink-ml-lib/src/main/java/org/apache/flink/ml/feature/sqltransformer/SQLTransformer.java
 
b/flink-ml-lib/src/main/java/org/apache/flink/ml/feature/sqltransformer/SQLTransformer.java
new file mode 100644
index 0000000..4707cab
--- /dev/null
+++ 
b/flink-ml-lib/src/main/java/org/apache/flink/ml/feature/sqltransformer/SQLTransformer.java
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.feature.sqltransformer;
+
+import org.apache.flink.api.common.functions.AggregateFunction;
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.ml.api.Transformer;
+import org.apache.flink.ml.common.datastream.EndOfStreamWindows;
+import org.apache.flink.ml.param.Param;
+import org.apache.flink.ml.util.ParamUtils;
+import org.apache.flink.ml.util.ReadWriteUtils;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.api.internal.TableImpl;
+import org.apache.flink.types.Row;
+import org.apache.flink.types.RowKind;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * SQLTransformer implements the transformations that are defined by SQL 
statement.
+ *
+ * <p>Currently we only support SQL syntax like `SELECT ... FROM __THIS__ ...` 
where `__THIS__`
+ * represents the input table and cannot be modified.
+ *
+ * <p>The select clause specifies the fields, constants, and expressions to 
display in the output.
+ * Except the cases described in the note section below, it can be any select 
clause that Flink SQL
+ * supports. Users can also use Flink SQL built-in function and UDFs to 
operate on these selected
+ * columns.
+ *
+ * <p>For example, SQLTransformer supports statements like:
+ *
+ * <ul>
+ *   <li>`SELECT a, a + b AS a_b FROM __THIS__`
+ *   <li>`SELECT a, SQRT(b) AS b_sqrt FROM __THIS__ where a > 5`
+ *   <li>`SELECT a, b, SUM(c) AS c_sum FROM __THIS__ GROUP BY a, b`
+ * </ul>
+ *
+ * <p>Note: This operator only generates append-only/insert-only table as its 
output. If the output
+ * table could possibly contain retract messages(e.g. perform `SELECT ... FROM 
__THIS__ GROUP BY
+ * ...` operation on a table in streaming mode), this operator would aggregate 
all changelogs and
+ * only output the final state.
+ */
+public class SQLTransformer
+        implements Transformer<SQLTransformer>, 
SQLTransformerParams<SQLTransformer> {
+    static final String TABLE_IDENTIFIER = "__THIS__";
+
+    private static final String INSERT_ONLY_EXCEPTION_PATTERN =
+            "^.* doesn't support consuming .* changes which is produced by 
node .*$";
+
+    private final Map<Param<?>, Object> paramMap = new HashMap<>();
+
+    public SQLTransformer() {
+        ParamUtils.initializeMapWithDefaultValues(paramMap, this);
+    }
+
+    @Override
+    public Table[] transform(Table... inputs) {
+        Preconditions.checkArgument(inputs.length == 1);
+        StreamTableEnvironment tEnv =
+                (StreamTableEnvironment) ((TableImpl) 
inputs[0]).getTableEnvironment();
+        String statement = getStatement().replace(TABLE_IDENTIFIER, 
inputs[0].toString());
+
+        Table outputTable = tEnv.sqlQuery(statement);
+
+        if (!isInsertOnlyTable(tEnv, outputTable)) {
+            Schema schema =
+                    
Schema.newBuilder().fromResolvedSchema(outputTable.getResolvedSchema()).build();
+            DataStream<Row> outputStream = tEnv.toChangelogStream(outputTable, 
schema);
+
+            outputStream =
+                    outputStream
+                            .windowAll(EndOfStreamWindows.get())
+                            .aggregate(
+                                    new ChangeLogStreamToDataStreamFunction(),
+                                    Types.LIST(outputStream.getType()),
+                                    Types.LIST(outputStream.getType()))
+                            .flatMap(new FlattenListFunction<>(), 
outputStream.getType());
+
+            outputTable = tEnv.fromDataStream(outputStream, schema);
+        }
+
+        return new Table[] {outputTable};
+    }
+
+    @Override
+    public void save(String path) throws IOException {
+        ReadWriteUtils.saveMetadata(this, path);
+    }
+
+    public static SQLTransformer load(StreamTableEnvironment tEnv, String 
path) throws IOException {
+        return ReadWriteUtils.loadStageParam(path);
+    }
+
+    @Override
+    public Map<Param<?>, Object> getParamMap() {
+        return paramMap;
+    }
+
+    private boolean isInsertOnlyTable(StreamTableEnvironment tEnv, Table 
table) {
+        try {
+            tEnv.toDataStream(table);
+            return true;
+        } catch (Exception e) {
+            if (e instanceof TableException
+                    && e.getMessage() != null
+                    && e.getMessage().matches(INSERT_ONLY_EXCEPTION_PATTERN)) {
+                return false;
+            }
+            throw e;
+        }
+    }
+
+    /**
+     * A function that converts a bounded changelog stream to an insert-only 
datastream. It
+     * aggregates all records in a bounded changelog stream and outputs each 
record in the
+     * aggregation result. Records are output according to their last 
modification time.
+     */
+    private static class ChangeLogStreamToDataStreamFunction
+            implements AggregateFunction<Row, List<Row>, List<Row>> {
+        @Override
+        public List<Row> createAccumulator() {
+            return new ArrayList<>();
+        }
+
+        @Override
+        public List<Row> add(Row value, List<Row> accumulator) {
+            switch (value.getKind()) {
+                case INSERT:
+                    accumulator.add(value);
+                    break;
+                case UPDATE_AFTER:
+                    value.setKind(RowKind.INSERT);
+                    accumulator.add(value);
+                    break;
+                case UPDATE_BEFORE:
+                case DELETE:
+                    value.setKind(RowKind.INSERT);
+                    accumulator.remove(value);
+                    break;
+                default:
+                    throw new UnsupportedOperationException();
+            }
+            return accumulator;
+        }
+
+        @Override
+        public List<Row> getResult(List<Row> accumulator) {
+            return accumulator;
+        }
+
+        @Override
+        public List<Row> merge(List<Row> a, List<Row> b) {
+            a.addAll(b);
+            return a;
+        }
+    }
+
+    private static class FlattenListFunction<T> implements 
FlatMapFunction<List<T>, T> {
+        @Override
+        public void flatMap(List<T> values, Collector<T> out) throws Exception 
{
+            for (T value : values) {
+                out.collect(value);
+            }
+        }
+    }
+}
diff --git 
a/flink-ml-lib/src/main/java/org/apache/flink/ml/feature/sqltransformer/SQLTransformerParams.java
 
b/flink-ml-lib/src/main/java/org/apache/flink/ml/feature/sqltransformer/SQLTransformerParams.java
new file mode 100644
index 0000000..389901f
--- /dev/null
+++ 
b/flink-ml-lib/src/main/java/org/apache/flink/ml/feature/sqltransformer/SQLTransformerParams.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.feature.sqltransformer;
+
+import org.apache.flink.ml.param.Param;
+import org.apache.flink.ml.param.ParamValidator;
+import org.apache.flink.ml.param.StringParam;
+import org.apache.flink.ml.param.WithParams;
+
+import static 
org.apache.flink.ml.feature.sqltransformer.SQLTransformer.TABLE_IDENTIFIER;
+
+/**
+ * Params for {@link SQLTransformer}.
+ *
+ * @param <T> The class type of this instance.
+ */
+public interface SQLTransformerParams<T> extends WithParams<T> {
+    Param<String> STATEMENT =
+            new StringParam("statement", "SQL statement.", null, new 
SQLStatementValidator());
+
+    default String getStatement() {
+        return get(STATEMENT);
+    }
+
+    default T setStatement(String value) {
+        return set(STATEMENT, value);
+    }
+
+    /** Param validator for SQL statements. */
+    class SQLStatementValidator implements ParamValidator<String> {
+        @Override
+        public boolean validate(String value) {
+            return value != null && value.contains(TABLE_IDENTIFIER);
+        }
+    }
+}
diff --git 
a/flink-ml-lib/src/test/java/org/apache/flink/ml/feature/SQLTransformerTest.java
 
b/flink-ml-lib/src/test/java/org/apache/flink/ml/feature/SQLTransformerTest.java
new file mode 100644
index 0000000..3e3952b
--- /dev/null
+++ 
b/flink-ml-lib/src/test/java/org/apache/flink/ml/feature/SQLTransformerTest.java
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.feature;
+
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.ml.feature.sqltransformer.SQLTransformer;
+import org.apache.flink.ml.util.TestUtils;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import 
org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.test.util.AbstractTestBase;
+import org.apache.flink.types.Row;
+
+import org.apache.commons.collections.IteratorUtils;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+/** Tests {@link SQLTransformer}. */
+public class SQLTransformerTest extends AbstractTestBase {
+    private static final List<Row> INPUT_DATA =
+            Arrays.asList(
+                    Row.of(0, 1.0, 3.0),
+                    Row.of(1, 2.0, 3.0),
+                    Row.of(2, 2.0, 2.0),
+                    Row.of(3, 4.0, 2.0));
+
+    private static final List<Row> EXPECTED_NUMERIC_DATA_OUTPUT =
+            Arrays.asList(
+                    Row.of(0, 1.0, 3.0, 4.0, 3.0),
+                    Row.of(1, 2.0, 3.0, 5.0, 6.0),
+                    Row.of(2, 2.0, 2.0, 4.0, 4.0),
+                    Row.of(3, 4.0, 2.0, 6.0, 8.0));
+
+    private static final List<Row> EXPECTED_BUILT_IN_FUNCTION_OUTPUT =
+            Arrays.asList(
+                    Row.of(0, 1.0, 3.0, 1.0),
+                    Row.of(1, 2.0, 3.0, Math.sqrt(2.0)),
+                    Row.of(2, 2.0, 2.0, Math.sqrt(2.0)),
+                    Row.of(3, 4.0, 2.0, 2.0));
+
+    private static final List<Row> EXPECTED_GROUP_BY_AGGREGATION_OUTPUT =
+            Arrays.asList(Row.of(3.0, 3.0), Row.of(2.0, 6.0));
+
+    private static final List<Row> EXPECTED_WINDOW_AGGREGATION_OUTPUT =
+            Collections.singletonList(Row.of(9.0));
+
+    private StreamTableEnvironment tEnv;
+    private StreamExecutionEnvironment env;
+    private Table inputTable;
+
+    @Before
+    public void before() {
+        Configuration config = new Configuration();
+        
config.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, 
true);
+        env = StreamExecutionEnvironment.getExecutionEnvironment(config);
+        env.getConfig().enableObjectReuse();
+        env.setParallelism(4);
+        env.enableCheckpointing(100);
+        env.setRestartStrategy(RestartStrategies.noRestart());
+        tEnv = StreamTableEnvironment.create(env);
+        DataStream<Row> inputStream =
+                env.fromCollection(
+                        INPUT_DATA, new RowTypeInfo(Types.INT, Types.DOUBLE, 
Types.DOUBLE));
+        inputTable = tEnv.fromDataStream(inputStream).as("id", "v1", "v2");
+    }
+
+    @Test
+    public void testParam() {
+        SQLTransformer sqlTransformer = new SQLTransformer();
+        sqlTransformer.setStatement("SELECT * FROM __THIS__");
+        assertEquals("SELECT * FROM __THIS__", sqlTransformer.getStatement());
+    }
+
+    @Test
+    public void testInvalidSQLStatement() {
+        SQLTransformer sqlTransformer = new SQLTransformer();
+
+        try {
+            sqlTransformer.setStatement("SELECT * FROM __THAT__");
+            fail();
+        } catch (Exception e) {
+            assertEquals(
+                    "Parameter statement is given an invalid value SELECT * 
FROM __THAT__",
+                    e.getMessage());
+        }
+    }
+
+    @Test
+    public void testOutputSchema() {
+        SQLTransformer sqlTransformer =
+                new SQLTransformer()
+                        .setStatement("SELECT *, (v1 + v2) AS v3, (v1 * v2) AS 
v4 FROM __THIS__");
+
+        Table outputTable = sqlTransformer.transform(inputTable)[0];
+
+        assertEquals(
+                Arrays.asList("id", "v1", "v2", "v3", "v4"),
+                outputTable.getResolvedSchema().getColumnNames());
+    }
+
+    @Test
+    public void testTransformNumericData() {
+        SQLTransformer sqlTransformer =
+                new SQLTransformer()
+                        .setStatement("SELECT *, (v1 + v2) AS v3, (v1 * v2) AS 
v4 FROM __THIS__");
+
+        Table outputTable = sqlTransformer.transform(inputTable)[0];
+
+        verifyOutputResult(outputTable, EXPECTED_NUMERIC_DATA_OUTPUT);
+    }
+
+    @Test
+    public void testBuiltInFunction() {
+        SQLTransformer sqlTransformer =
+                new SQLTransformer().setStatement("SELECT *, SQRT(v1) AS v3 
FROM __THIS__");
+
+        Table outputTable = sqlTransformer.transform(inputTable)[0];
+
+        verifyOutputResult(outputTable, EXPECTED_BUILT_IN_FUNCTION_OUTPUT);
+    }
+
+    @Test
+    public void testGroupByAggregation() {
+        SQLTransformer sqlTransformer =
+                new SQLTransformer()
+                        .setStatement("SELECT v2, SUM(v1) AS v3 FROM __THIS__ 
GROUP BY v2");
+
+        Table outputTable = sqlTransformer.transform(inputTable)[0];
+
+        verifyOutputResult(outputTable, EXPECTED_GROUP_BY_AGGREGATION_OUTPUT);
+    }
+
+    @Test
+    public void testWindowAggregation() {
+        Schema schema =
+                Schema.newBuilder()
+                        .column("id", DataTypes.INT())
+                        .column("v1", DataTypes.DOUBLE())
+                        .column("v2", DataTypes.DOUBLE())
+                        .columnByExpression("time_ltz", "TO_TIMESTAMP_LTZ(id * 
1000, 3)")
+                        .watermark("time_ltz", "time_ltz - INTERVAL '5' 
SECOND")
+                        .build();
+
+        DataStream<Row> inputStream =
+                env.fromCollection(
+                        INPUT_DATA,
+                        new RowTypeInfo(
+                                new TypeInformation[] {Types.INT, 
Types.DOUBLE, Types.DOUBLE},
+                                new String[] {"id", "v1", "v2"}));
+        inputTable = tEnv.fromDataStream(inputStream, schema);
+
+        String statement =
+                "SELECT SUM(v1) AS v3 "
+                        + "FROM TABLE(TUMBLE(TABLE __THIS__, 
DESCRIPTOR(time_ltz), INTERVAL '10' MINUTES)) "
+                        + "GROUP BY window_start, window_end";
+
+        SQLTransformer sqlTransformer = new 
SQLTransformer().setStatement(statement);
+
+        Table outputTable = sqlTransformer.transform(inputTable)[0];
+
+        verifyOutputResult(outputTable, EXPECTED_WINDOW_AGGREGATION_OUTPUT);
+    }
+
+    @Test
+    public void testSaveLoadAndTransform() throws Exception {
+        SQLTransformer sqlTransformer =
+                new SQLTransformer()
+                        .setStatement("SELECT *, (v1 + v2) AS v3, (v1 * v2) AS 
v4 FROM __THIS__");
+
+        SQLTransformer loadedSQLTransformer =
+                TestUtils.saveAndReload(
+                        tEnv, sqlTransformer, 
TEMPORARY_FOLDER.newFolder().getAbsolutePath());
+
+        Table outputTable = loadedSQLTransformer.transform(inputTable)[0];
+
+        verifyOutputResult(outputTable, EXPECTED_NUMERIC_DATA_OUTPUT);
+    }
+
+    private static void verifyOutputResult(Table outputTable, List<Row> 
expectedOutput) {
+        List<Row> actualOutput = 
IteratorUtils.toList(outputTable.execute().collect());
+        assertEquals(new HashSet<>(expectedOutput), new 
HashSet<>(actualOutput));
+    }
+}
diff --git 
a/flink-ml-python/pyflink/examples/ml/feature/sqltransformer_example.py 
b/flink-ml-python/pyflink/examples/ml/feature/sqltransformer_example.py
new file mode 100644
index 0000000..a56d0ad
--- /dev/null
+++ b/flink-ml-python/pyflink/examples/ml/feature/sqltransformer_example.py
@@ -0,0 +1,49 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+# Simple program that creates a SQLTransformer instance and uses it for feature
+# engineering.
+
+from pyflink.common import Types
+from pyflink.datastream import StreamExecutionEnvironment
+from pyflink.ml.lib.feature.sqltransformer import SQLTransformer
+from pyflink.table import StreamTableEnvironment
+
+env = StreamExecutionEnvironment.get_execution_environment()
+
+t_env = StreamTableEnvironment.create(env)
+
+# Generates input data.
+input_data_table = t_env.from_data_stream(
+    env.from_collection([
+        (0, 1.0, 3.0),
+        (2, 2.0, 5.0),
+    ],
+        type_info=Types.ROW_NAMED(
+            ['id', 'v1', 'v2'],
+            [Types.INT(), Types.DOUBLE(), Types.DOUBLE()])))
+
+# Creates a SQLTransformer object and initializes its parameters.
+sql_transformer = SQLTransformer() \
+    .set_statement('SELECT *, (v1 + v2) AS v3, (v1 * v2) AS v4 FROM __THIS__')
+
+# Uses the SQLTransformer object for feature transformations.
+output_table = sql_transformer.transform(input_data_table)[0]
+
+# Extracts and displays the results.
+output_table.execute().print()
diff --git a/flink-ml-python/pyflink/ml/lib/feature/sqltransformer.py 
b/flink-ml-python/pyflink/ml/lib/feature/sqltransformer.py
new file mode 100644
index 0000000..b07af4d
--- /dev/null
+++ b/flink-ml-python/pyflink/ml/lib/feature/sqltransformer.py
@@ -0,0 +1,85 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+import typing
+
+from pyflink.ml.core.param import Param, StringParam
+from pyflink.ml.core.wrapper import JavaWithParams
+from pyflink.ml.lib.feature.common import JavaFeatureTransformer
+
+
+class _SQLTransformerParams(
+    JavaWithParams
+):
+    """
+    Params for :class:`SQLTransformer`.
+    """
+
+    STATEMENT: Param[str] = StringParam(
+        "statement",
+        "SQL statement.",
+        None
+    )
+
+    def __init__(self, java_params):
+        super(_SQLTransformerParams, self).__init__(java_params)
+
+    def set_statement(self, value: str):
+        return typing.cast(_SQLTransformerParams, self.set(self.STATEMENT, 
value))
+
+    def get_statement(self) -> str:
+        return self.get(self.STATEMENT)
+
+    @property
+    def statement(self) -> str:
+        return self.get_statement()
+
+
+class SQLTransformer(JavaFeatureTransformer, _SQLTransformerParams):
+    """
+    SQLTransformer implements the transformations that are defined by SQL 
statement.
+
+    Currently we only support SQL syntax like `SELECT ... FROM __THIS__ ...` 
where `__THIS__`
+    represents the input table and cannot be modified.
+
+    The select clause specifies the fields, constants, and expressions to 
display in the output.
+    Except the cases described in the note section below, it can be any select 
clause that Flink SQL
+    supports. Users can also use Flink SQL built-in function and UDFs to 
operate on these selected
+    columns.
+
+    For example, SQLTransformer supports statements like:
+
+    - `SELECT a, a + b AS a_b FROM __THIS__`
+    - `SELECT a, SQRT(b) AS b_sqrt FROM __THIS__ where a > 5`
+    - `SELECT a, b, SUM(c) AS c_sum FROM __THIS__ GROUP BY a, b`
+
+    Note: This operator only generates append-only/insert-only table as its 
output. If the output
+    table could possibly contain retract messages(e.g. perform `SELECT ... 
FROM __THIS__ GROUP BY
+    ...` operation on a table in streaming mode), this operator would 
aggregate all changelogs and
+    only output the final state.
+    """
+
+    def __init__(self, java_model=None):
+        super(SQLTransformer, self).__init__(java_model)
+
+    @classmethod
+    def _java_transformer_package_name(cls) -> str:
+        return "sqltransformer"
+
+    @classmethod
+    def _java_transformer_class_name(cls) -> str:
+        return "SQLTransformer"
diff --git 
a/flink-ml-python/pyflink/ml/lib/feature/tests/test_sqltransformer.py 
b/flink-ml-python/pyflink/ml/lib/feature/tests/test_sqltransformer.py
new file mode 100644
index 0000000..90470a7
--- /dev/null
+++ b/flink-ml-python/pyflink/ml/lib/feature/tests/test_sqltransformer.py
@@ -0,0 +1,65 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+from pyflink.common import Types, Row
+
+from pyflink.ml.lib.feature.sqltransformer import SQLTransformer
+from pyflink.ml.tests.test_utils import PyFlinkMLTestCase
+
+
+class SQLTransformerTest(PyFlinkMLTestCase):
+    def setUp(self):
+        super(SQLTransformerTest, self).setUp()
+        self.input_table = self.t_env.from_data_stream(
+            self.env.from_collection([
+                (0, 1.0, 3.0),
+                (2, 2.0, 5.0),
+            ],
+                type_info=Types.ROW_NAMED(
+                    ['id', 'v1', 'v2'],
+                    [Types.INT(), Types.DOUBLE(), Types.DOUBLE()])))
+        self.expected_output = [
+            (0, 1.0, 3.0, 4.0, 3.0),
+            (2, 2.0, 5.0, 7.0, 10.0)
+        ]
+
+    def test_param(self):
+        sql_transformer = SQLTransformer()
+        sql_transformer.set_statement('SELECT * FROM __THIS__')
+        self.assertEqual('SELECT * FROM __THIS__', sql_transformer.statement)
+
+    def test_output_schema(self):
+        sql_transformer = SQLTransformer() \
+            .set_statement('SELECT *, (v1 + v2) AS v3, (v1 * v2) AS v4 FROM 
__THIS__')
+        output_table = sql_transformer.transform(self.input_table)[0]
+
+        self.assertEqual(
+            ['id', 'v1', 'v2', 'v3', 'v4'],
+            output_table.get_schema().get_field_names())
+
+    def test_save_load_transform(self):
+        sql_transformer = SQLTransformer() \
+            .set_statement('SELECT *, (v1 + v2) AS v3, (v1 * v2) AS v4 FROM 
__THIS__')
+        loaded_sql_transformer = self.save_and_reload(sql_transformer)
+        output_table = loaded_sql_transformer.transform(self.input_table)[0]
+        actual_output = [output for output in
+                         
self.t_env.to_data_stream(output_table).execute_and_collect()]
+        actual_output.sort(key=lambda x: x[0])
+        self.assertEqual(len(self.expected_output), len(actual_output))
+        for i in range(len(actual_output)):
+            self.assertEqual(Row(*self.expected_output[i]), actual_output[i])


Reply via email to