This is an automated email from the ASF dual-hosted git repository.
lindong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-ml.git
The following commit(s) were added to refs/heads/master by this push:
new 76a0486 [FLINK-29602] Add Transformer for SQLTransformer
76a0486 is described below
commit 76a04869c117e15384f586931416ce1ed301bdf0
Author: yunfengzhou-hub <[email protected]>
AuthorDate: Mon Nov 21 15:19:26 2022 +0800
[FLINK-29602] Add Transformer for SQLTransformer
This closes #175.
---
.../docs/operators/feature/sqltransformer.md | 142 ++++++++++++++
.../ml/examples/feature/SQLTransformerExample.java | 56 ++++++
.../ml/feature/sqltransformer/SQLTransformer.java | 193 ++++++++++++++++++
.../sqltransformer/SQLTransformerParams.java | 52 +++++
.../flink/ml/feature/SQLTransformerTest.java | 215 +++++++++++++++++++++
.../examples/ml/feature/sqltransformer_example.py | 49 +++++
.../pyflink/ml/lib/feature/sqltransformer.py | 85 ++++++++
.../ml/lib/feature/tests/test_sqltransformer.py | 65 +++++++
8 files changed, 857 insertions(+)
diff --git a/docs/content/docs/operators/feature/sqltransformer.md
b/docs/content/docs/operators/feature/sqltransformer.md
new file mode 100644
index 0000000..e785da8
--- /dev/null
+++ b/docs/content/docs/operators/feature/sqltransformer.md
@@ -0,0 +1,142 @@
+---
+title: "SQLTransformer"
+weight: 1
+type: docs
+aliases:
+- /operators/feature/sqltransformer.html
+---
+
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+## SQLTransformer
+
+SQLTransformer implements the transformations that are defined by SQL
statement.
+
+Currently we only support SQL syntax like `SELECT ... FROM __THIS__ ...` where
+`__THIS__` represents the input table and cannot be modified.
+
+The select clause specifies the fields, constants, and expressions to display
in
+the output. Except the cases described in the note section below, it can be any
+select clause that Flink SQL supports. Users can also use Flink SQL built-in
+function and UDFs to operate on these selected columns.
+
+For example, SQLTransformer supports statements like:
+
+- `SELECT a, a + b AS a_b FROM __THIS__`
+- `SELECT a, SQRT(b) AS b_sqrt FROM __THIS__ where a > 5`
+- `SELECT a, b, SUM(c) AS c_sum FROM __THIS__ GROUP BY a, b`
+
+Note: This operator only generates append-only/insert-only table as its output.
+If the output table could possibly contain retract messages(e.g. perform
`SELECT
+... FROM __THIS__ GROUP BY ...` operation on a table in streaming mode), this
+operator would aggregate all changelogs and only output the final state.
+
+### Parameters
+
+| Key | Default | Type | Required | Description |
+|:----------|:--------|:-------|:---------|:---------------|
+| statement | `null` | String | yes | SQL statement. |
+
+### Examples
+
+{{< tabs examples >}}
+
+{{< tab "Java">}}
+
+```java
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.ml.feature.sqltransformer.SQLTransformer;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.types.Row;
+
+import java.util.Arrays;
+
+/** Simple program that creates a SQLTransformer instance and uses it for
feature engineering. */
+public class SQLTransformerExample {
+ public static void main(String[] args) {
+ StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
+
+ // Generates input data.
+ DataStream<Row> inputStream =
+ env.fromCollection(
+ Arrays.asList(Row.of(0, 1.0, 3.0), Row.of(2, 2.0,
5.0)),
+ new RowTypeInfo(Types.INT, Types.DOUBLE,
Types.DOUBLE));
+ Table inputTable = tEnv.fromDataStream(inputStream).as("id", "v1",
"v2");
+
+ // Creates a SQLTransformer object and initializes its parameters.
+ SQLTransformer sqlTransformer =
+ new SQLTransformer()
+ .setStatement("SELECT *, (v1 + v2) AS v3, (v1 * v2) AS
v4 FROM __THIS__");
+
+ // Uses the SQLTransformer object for feature transformations.
+ Table outputTable = sqlTransformer.transform(inputTable)[0];
+
+ // Extracts and displays the results.
+ outputTable.execute().print();
+ }
+}
+```
+
+{{< /tab>}}
+
+{{< tab "Python">}}
+
+```python
+# Simple program that creates a SQLTransformer instance and uses it for feature
+# engineering.
+
+from pyflink.common import Types
+from pyflink.datastream import StreamExecutionEnvironment
+from pyflink.ml.lib.feature.sqltransformer import SQLTransformer
+from pyflink.table import StreamTableEnvironment
+
+env = StreamExecutionEnvironment.get_execution_environment()
+
+t_env = StreamTableEnvironment.create(env)
+
+# Generates input data.
+input_data_table = t_env.from_data_stream(
+ env.from_collection([
+ (0, 1.0, 3.0),
+ (2, 2.0, 5.0),
+ ],
+ type_info=Types.ROW_NAMED(
+ ['id', 'v1', 'v2'],
+ [Types.INT(), Types.DOUBLE(), Types.DOUBLE()])))
+
+# Creates a SQLTransformer object and initializes its parameters.
+sql_transformer = SQLTransformer() \
+ .set_statement('SELECT *, (v1 + v2) AS v3, (v1 * v2) AS v4 FROM __THIS__')
+
+# Uses the SQLTransformer object for feature transformations.
+output_table = sql_transformer.transform(input_data_table)[0]
+
+# Extracts and displays the results.
+output_table.execute().print()
+```
+
+{{< /tab>}}
+
+{{< /tabs>}}
diff --git
a/flink-ml-examples/src/main/java/org/apache/flink/ml/examples/feature/SQLTransformerExample.java
b/flink-ml-examples/src/main/java/org/apache/flink/ml/examples/feature/SQLTransformerExample.java
new file mode 100644
index 0000000..1a64c3b
--- /dev/null
+++
b/flink-ml-examples/src/main/java/org/apache/flink/ml/examples/feature/SQLTransformerExample.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.examples.feature;
+
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.ml.feature.sqltransformer.SQLTransformer;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.types.Row;
+
+import java.util.Arrays;
+
+/** Simple program that creates a SQLTransformer instance and uses it for
feature engineering. */
+public class SQLTransformerExample {
+ public static void main(String[] args) {
+ StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
+
+ // Generates input data.
+ DataStream<Row> inputStream =
+ env.fromCollection(
+ Arrays.asList(Row.of(0, 1.0, 3.0), Row.of(2, 2.0,
5.0)),
+ new RowTypeInfo(Types.INT, Types.DOUBLE,
Types.DOUBLE));
+ Table inputTable = tEnv.fromDataStream(inputStream).as("id", "v1",
"v2");
+
+ // Creates a SQLTransformer object and initializes its parameters.
+ SQLTransformer sqlTransformer =
+ new SQLTransformer()
+ .setStatement("SELECT *, (v1 + v2) AS v3, (v1 * v2) AS
v4 FROM __THIS__");
+
+ // Uses the SQLTransformer object for feature transformations.
+ Table outputTable = sqlTransformer.transform(inputTable)[0];
+
+ // Extracts and displays the results.
+ outputTable.execute().print();
+ }
+}
diff --git
a/flink-ml-lib/src/main/java/org/apache/flink/ml/feature/sqltransformer/SQLTransformer.java
b/flink-ml-lib/src/main/java/org/apache/flink/ml/feature/sqltransformer/SQLTransformer.java
new file mode 100644
index 0000000..4707cab
--- /dev/null
+++
b/flink-ml-lib/src/main/java/org/apache/flink/ml/feature/sqltransformer/SQLTransformer.java
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.feature.sqltransformer;
+
+import org.apache.flink.api.common.functions.AggregateFunction;
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.ml.api.Transformer;
+import org.apache.flink.ml.common.datastream.EndOfStreamWindows;
+import org.apache.flink.ml.param.Param;
+import org.apache.flink.ml.util.ParamUtils;
+import org.apache.flink.ml.util.ReadWriteUtils;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.api.internal.TableImpl;
+import org.apache.flink.types.Row;
+import org.apache.flink.types.RowKind;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * SQLTransformer implements the transformations that are defined by SQL
statement.
+ *
+ * <p>Currently we only support SQL syntax like `SELECT ... FROM __THIS__ ...`
where `__THIS__`
+ * represents the input table and cannot be modified.
+ *
+ * <p>The select clause specifies the fields, constants, and expressions to
display in the output.
+ * Except the cases described in the note section below, it can be any select
clause that Flink SQL
+ * supports. Users can also use Flink SQL built-in function and UDFs to
operate on these selected
+ * columns.
+ *
+ * <p>For example, SQLTransformer supports statements like:
+ *
+ * <ul>
+ * <li>`SELECT a, a + b AS a_b FROM __THIS__`
+ * <li>`SELECT a, SQRT(b) AS b_sqrt FROM __THIS__ where a > 5`
+ * <li>`SELECT a, b, SUM(c) AS c_sum FROM __THIS__ GROUP BY a, b`
+ * </ul>
+ *
+ * <p>Note: This operator only generates append-only/insert-only table as its
output. If the output
+ * table could possibly contain retract messages(e.g. perform `SELECT ... FROM
__THIS__ GROUP BY
+ * ...` operation on a table in streaming mode), this operator would aggregate
all changelogs and
+ * only output the final state.
+ */
+public class SQLTransformer
+ implements Transformer<SQLTransformer>,
SQLTransformerParams<SQLTransformer> {
+ static final String TABLE_IDENTIFIER = "__THIS__";
+
+ private static final String INSERT_ONLY_EXCEPTION_PATTERN =
+ "^.* doesn't support consuming .* changes which is produced by
node .*$";
+
+ private final Map<Param<?>, Object> paramMap = new HashMap<>();
+
+ public SQLTransformer() {
+ ParamUtils.initializeMapWithDefaultValues(paramMap, this);
+ }
+
+ @Override
+ public Table[] transform(Table... inputs) {
+ Preconditions.checkArgument(inputs.length == 1);
+ StreamTableEnvironment tEnv =
+ (StreamTableEnvironment) ((TableImpl)
inputs[0]).getTableEnvironment();
+ String statement = getStatement().replace(TABLE_IDENTIFIER,
inputs[0].toString());
+
+ Table outputTable = tEnv.sqlQuery(statement);
+
+ if (!isInsertOnlyTable(tEnv, outputTable)) {
+ Schema schema =
+
Schema.newBuilder().fromResolvedSchema(outputTable.getResolvedSchema()).build();
+ DataStream<Row> outputStream = tEnv.toChangelogStream(outputTable,
schema);
+
+ outputStream =
+ outputStream
+ .windowAll(EndOfStreamWindows.get())
+ .aggregate(
+ new ChangeLogStreamToDataStreamFunction(),
+ Types.LIST(outputStream.getType()),
+ Types.LIST(outputStream.getType()))
+ .flatMap(new FlattenListFunction<>(),
outputStream.getType());
+
+ outputTable = tEnv.fromDataStream(outputStream, schema);
+ }
+
+ return new Table[] {outputTable};
+ }
+
+ @Override
+ public void save(String path) throws IOException {
+ ReadWriteUtils.saveMetadata(this, path);
+ }
+
+ public static SQLTransformer load(StreamTableEnvironment tEnv, String
path) throws IOException {
+ return ReadWriteUtils.loadStageParam(path);
+ }
+
+ @Override
+ public Map<Param<?>, Object> getParamMap() {
+ return paramMap;
+ }
+
+ private boolean isInsertOnlyTable(StreamTableEnvironment tEnv, Table
table) {
+ try {
+ tEnv.toDataStream(table);
+ return true;
+ } catch (Exception e) {
+ if (e instanceof TableException
+ && e.getMessage() != null
+ && e.getMessage().matches(INSERT_ONLY_EXCEPTION_PATTERN)) {
+ return false;
+ }
+ throw e;
+ }
+ }
+
+ /**
+ * A function that converts a bounded changelog stream to an insert-only
datastream. It
+ * aggregates all records in a bounded changelog stream and outputs each
record in the
+ * aggregation result. Records are output according to their last
modification time.
+ */
+ private static class ChangeLogStreamToDataStreamFunction
+ implements AggregateFunction<Row, List<Row>, List<Row>> {
+ @Override
+ public List<Row> createAccumulator() {
+ return new ArrayList<>();
+ }
+
+ @Override
+ public List<Row> add(Row value, List<Row> accumulator) {
+ switch (value.getKind()) {
+ case INSERT:
+ accumulator.add(value);
+ break;
+ case UPDATE_AFTER:
+ value.setKind(RowKind.INSERT);
+ accumulator.add(value);
+ break;
+ case UPDATE_BEFORE:
+ case DELETE:
+ value.setKind(RowKind.INSERT);
+ accumulator.remove(value);
+ break;
+ default:
+ throw new UnsupportedOperationException();
+ }
+ return accumulator;
+ }
+
+ @Override
+ public List<Row> getResult(List<Row> accumulator) {
+ return accumulator;
+ }
+
+ @Override
+ public List<Row> merge(List<Row> a, List<Row> b) {
+ a.addAll(b);
+ return a;
+ }
+ }
+
+ private static class FlattenListFunction<T> implements
FlatMapFunction<List<T>, T> {
+ @Override
+ public void flatMap(List<T> values, Collector<T> out) throws Exception
{
+ for (T value : values) {
+ out.collect(value);
+ }
+ }
+ }
+}
diff --git
a/flink-ml-lib/src/main/java/org/apache/flink/ml/feature/sqltransformer/SQLTransformerParams.java
b/flink-ml-lib/src/main/java/org/apache/flink/ml/feature/sqltransformer/SQLTransformerParams.java
new file mode 100644
index 0000000..389901f
--- /dev/null
+++
b/flink-ml-lib/src/main/java/org/apache/flink/ml/feature/sqltransformer/SQLTransformerParams.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.feature.sqltransformer;
+
+import org.apache.flink.ml.param.Param;
+import org.apache.flink.ml.param.ParamValidator;
+import org.apache.flink.ml.param.StringParam;
+import org.apache.flink.ml.param.WithParams;
+
+import static
org.apache.flink.ml.feature.sqltransformer.SQLTransformer.TABLE_IDENTIFIER;
+
+/**
+ * Params for {@link SQLTransformer}.
+ *
+ * @param <T> The class type of this instance.
+ */
+public interface SQLTransformerParams<T> extends WithParams<T> {
+ Param<String> STATEMENT =
+ new StringParam("statement", "SQL statement.", null, new
SQLStatementValidator());
+
+ default String getStatement() {
+ return get(STATEMENT);
+ }
+
+ default T setStatement(String value) {
+ return set(STATEMENT, value);
+ }
+
+ /** Param validator for SQL statements. */
+ class SQLStatementValidator implements ParamValidator<String> {
+ @Override
+ public boolean validate(String value) {
+ return value != null && value.contains(TABLE_IDENTIFIER);
+ }
+ }
+}
diff --git
a/flink-ml-lib/src/test/java/org/apache/flink/ml/feature/SQLTransformerTest.java
b/flink-ml-lib/src/test/java/org/apache/flink/ml/feature/SQLTransformerTest.java
new file mode 100644
index 0000000..3e3952b
--- /dev/null
+++
b/flink-ml-lib/src/test/java/org/apache/flink/ml/feature/SQLTransformerTest.java
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.feature;
+
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.ml.feature.sqltransformer.SQLTransformer;
+import org.apache.flink.ml.util.TestUtils;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import
org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.test.util.AbstractTestBase;
+import org.apache.flink.types.Row;
+
+import org.apache.commons.collections.IteratorUtils;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+/** Tests {@link SQLTransformer}. */
+public class SQLTransformerTest extends AbstractTestBase {
+ private static final List<Row> INPUT_DATA =
+ Arrays.asList(
+ Row.of(0, 1.0, 3.0),
+ Row.of(1, 2.0, 3.0),
+ Row.of(2, 2.0, 2.0),
+ Row.of(3, 4.0, 2.0));
+
+ private static final List<Row> EXPECTED_NUMERIC_DATA_OUTPUT =
+ Arrays.asList(
+ Row.of(0, 1.0, 3.0, 4.0, 3.0),
+ Row.of(1, 2.0, 3.0, 5.0, 6.0),
+ Row.of(2, 2.0, 2.0, 4.0, 4.0),
+ Row.of(3, 4.0, 2.0, 6.0, 8.0));
+
+ private static final List<Row> EXPECTED_BUILT_IN_FUNCTION_OUTPUT =
+ Arrays.asList(
+ Row.of(0, 1.0, 3.0, 1.0),
+ Row.of(1, 2.0, 3.0, Math.sqrt(2.0)),
+ Row.of(2, 2.0, 2.0, Math.sqrt(2.0)),
+ Row.of(3, 4.0, 2.0, 2.0));
+
+ private static final List<Row> EXPECTED_GROUP_BY_AGGREGATION_OUTPUT =
+ Arrays.asList(Row.of(3.0, 3.0), Row.of(2.0, 6.0));
+
+ private static final List<Row> EXPECTED_WINDOW_AGGREGATION_OUTPUT =
+ Collections.singletonList(Row.of(9.0));
+
+ private StreamTableEnvironment tEnv;
+ private StreamExecutionEnvironment env;
+ private Table inputTable;
+
+ @Before
+ public void before() {
+ Configuration config = new Configuration();
+
config.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH,
true);
+ env = StreamExecutionEnvironment.getExecutionEnvironment(config);
+ env.getConfig().enableObjectReuse();
+ env.setParallelism(4);
+ env.enableCheckpointing(100);
+ env.setRestartStrategy(RestartStrategies.noRestart());
+ tEnv = StreamTableEnvironment.create(env);
+ DataStream<Row> inputStream =
+ env.fromCollection(
+ INPUT_DATA, new RowTypeInfo(Types.INT, Types.DOUBLE,
Types.DOUBLE));
+ inputTable = tEnv.fromDataStream(inputStream).as("id", "v1", "v2");
+ }
+
+ @Test
+ public void testParam() {
+ SQLTransformer sqlTransformer = new SQLTransformer();
+ sqlTransformer.setStatement("SELECT * FROM __THIS__");
+ assertEquals("SELECT * FROM __THIS__", sqlTransformer.getStatement());
+ }
+
+ @Test
+ public void testInvalidSQLStatement() {
+ SQLTransformer sqlTransformer = new SQLTransformer();
+
+ try {
+ sqlTransformer.setStatement("SELECT * FROM __THAT__");
+ fail();
+ } catch (Exception e) {
+ assertEquals(
+ "Parameter statement is given an invalid value SELECT *
FROM __THAT__",
+ e.getMessage());
+ }
+ }
+
+ @Test
+ public void testOutputSchema() {
+ SQLTransformer sqlTransformer =
+ new SQLTransformer()
+ .setStatement("SELECT *, (v1 + v2) AS v3, (v1 * v2) AS
v4 FROM __THIS__");
+
+ Table outputTable = sqlTransformer.transform(inputTable)[0];
+
+ assertEquals(
+ Arrays.asList("id", "v1", "v2", "v3", "v4"),
+ outputTable.getResolvedSchema().getColumnNames());
+ }
+
+ @Test
+ public void testTransformNumericData() {
+ SQLTransformer sqlTransformer =
+ new SQLTransformer()
+ .setStatement("SELECT *, (v1 + v2) AS v3, (v1 * v2) AS
v4 FROM __THIS__");
+
+ Table outputTable = sqlTransformer.transform(inputTable)[0];
+
+ verifyOutputResult(outputTable, EXPECTED_NUMERIC_DATA_OUTPUT);
+ }
+
+ @Test
+ public void testBuiltInFunction() {
+ SQLTransformer sqlTransformer =
+ new SQLTransformer().setStatement("SELECT *, SQRT(v1) AS v3
FROM __THIS__");
+
+ Table outputTable = sqlTransformer.transform(inputTable)[0];
+
+ verifyOutputResult(outputTable, EXPECTED_BUILT_IN_FUNCTION_OUTPUT);
+ }
+
+ @Test
+ public void testGroupByAggregation() {
+ SQLTransformer sqlTransformer =
+ new SQLTransformer()
+ .setStatement("SELECT v2, SUM(v1) AS v3 FROM __THIS__
GROUP BY v2");
+
+ Table outputTable = sqlTransformer.transform(inputTable)[0];
+
+ verifyOutputResult(outputTable, EXPECTED_GROUP_BY_AGGREGATION_OUTPUT);
+ }
+
+ @Test
+ public void testWindowAggregation() {
+ Schema schema =
+ Schema.newBuilder()
+ .column("id", DataTypes.INT())
+ .column("v1", DataTypes.DOUBLE())
+ .column("v2", DataTypes.DOUBLE())
+ .columnByExpression("time_ltz", "TO_TIMESTAMP_LTZ(id *
1000, 3)")
+ .watermark("time_ltz", "time_ltz - INTERVAL '5'
SECOND")
+ .build();
+
+ DataStream<Row> inputStream =
+ env.fromCollection(
+ INPUT_DATA,
+ new RowTypeInfo(
+ new TypeInformation[] {Types.INT,
Types.DOUBLE, Types.DOUBLE},
+ new String[] {"id", "v1", "v2"}));
+ inputTable = tEnv.fromDataStream(inputStream, schema);
+
+ String statement =
+ "SELECT SUM(v1) AS v3 "
+ + "FROM TABLE(TUMBLE(TABLE __THIS__,
DESCRIPTOR(time_ltz), INTERVAL '10' MINUTES)) "
+ + "GROUP BY window_start, window_end";
+
+ SQLTransformer sqlTransformer = new
SQLTransformer().setStatement(statement);
+
+ Table outputTable = sqlTransformer.transform(inputTable)[0];
+
+ verifyOutputResult(outputTable, EXPECTED_WINDOW_AGGREGATION_OUTPUT);
+ }
+
+ @Test
+ public void testSaveLoadAndTransform() throws Exception {
+ SQLTransformer sqlTransformer =
+ new SQLTransformer()
+ .setStatement("SELECT *, (v1 + v2) AS v3, (v1 * v2) AS
v4 FROM __THIS__");
+
+ SQLTransformer loadedSQLTransformer =
+ TestUtils.saveAndReload(
+ tEnv, sqlTransformer,
TEMPORARY_FOLDER.newFolder().getAbsolutePath());
+
+ Table outputTable = loadedSQLTransformer.transform(inputTable)[0];
+
+ verifyOutputResult(outputTable, EXPECTED_NUMERIC_DATA_OUTPUT);
+ }
+
+ private static void verifyOutputResult(Table outputTable, List<Row>
expectedOutput) {
+ List<Row> actualOutput =
IteratorUtils.toList(outputTable.execute().collect());
+ assertEquals(new HashSet<>(expectedOutput), new
HashSet<>(actualOutput));
+ }
+}
diff --git
a/flink-ml-python/pyflink/examples/ml/feature/sqltransformer_example.py
b/flink-ml-python/pyflink/examples/ml/feature/sqltransformer_example.py
new file mode 100644
index 0000000..a56d0ad
--- /dev/null
+++ b/flink-ml-python/pyflink/examples/ml/feature/sqltransformer_example.py
@@ -0,0 +1,49 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+# Simple program that creates a SQLTransformer instance and uses it for feature
+# engineering.
+
+from pyflink.common import Types
+from pyflink.datastream import StreamExecutionEnvironment
+from pyflink.ml.lib.feature.sqltransformer import SQLTransformer
+from pyflink.table import StreamTableEnvironment
+
+env = StreamExecutionEnvironment.get_execution_environment()
+
+t_env = StreamTableEnvironment.create(env)
+
+# Generates input data.
+input_data_table = t_env.from_data_stream(
+ env.from_collection([
+ (0, 1.0, 3.0),
+ (2, 2.0, 5.0),
+ ],
+ type_info=Types.ROW_NAMED(
+ ['id', 'v1', 'v2'],
+ [Types.INT(), Types.DOUBLE(), Types.DOUBLE()])))
+
+# Creates a SQLTransformer object and initializes its parameters.
+sql_transformer = SQLTransformer() \
+ .set_statement('SELECT *, (v1 + v2) AS v3, (v1 * v2) AS v4 FROM __THIS__')
+
+# Uses the SQLTransformer object for feature transformations.
+output_table = sql_transformer.transform(input_data_table)[0]
+
+# Extracts and displays the results.
+output_table.execute().print()
diff --git a/flink-ml-python/pyflink/ml/lib/feature/sqltransformer.py
b/flink-ml-python/pyflink/ml/lib/feature/sqltransformer.py
new file mode 100644
index 0000000..b07af4d
--- /dev/null
+++ b/flink-ml-python/pyflink/ml/lib/feature/sqltransformer.py
@@ -0,0 +1,85 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+import typing
+
+from pyflink.ml.core.param import Param, StringParam
+from pyflink.ml.core.wrapper import JavaWithParams
+from pyflink.ml.lib.feature.common import JavaFeatureTransformer
+
+
+class _SQLTransformerParams(
+ JavaWithParams
+):
+ """
+ Params for :class:`SQLTransformer`.
+ """
+
+ STATEMENT: Param[str] = StringParam(
+ "statement",
+ "SQL statement.",
+ None
+ )
+
+ def __init__(self, java_params):
+ super(_SQLTransformerParams, self).__init__(java_params)
+
+ def set_statement(self, value: str):
+ return typing.cast(_SQLTransformerParams, self.set(self.STATEMENT,
value))
+
+ def get_statement(self) -> str:
+ return self.get(self.STATEMENT)
+
+ @property
+ def statement(self) -> str:
+ return self.get_statement()
+
+
+class SQLTransformer(JavaFeatureTransformer, _SQLTransformerParams):
+ """
+ SQLTransformer implements the transformations that are defined by SQL
statement.
+
+ Currently we only support SQL syntax like `SELECT ... FROM __THIS__ ...`
where `__THIS__`
+ represents the input table and cannot be modified.
+
+ The select clause specifies the fields, constants, and expressions to
display in the output.
+ Except the cases described in the note section below, it can be any select
clause that Flink SQL
+ supports. Users can also use Flink SQL built-in function and UDFs to
operate on these selected
+ columns.
+
+ For example, SQLTransformer supports statements like:
+
+ - `SELECT a, a + b AS a_b FROM __THIS__`
+ - `SELECT a, SQRT(b) AS b_sqrt FROM __THIS__ where a > 5`
+ - `SELECT a, b, SUM(c) AS c_sum FROM __THIS__ GROUP BY a, b`
+
+ Note: This operator only generates append-only/insert-only table as its
output. If the output
+ table could possibly contain retract messages(e.g. perform `SELECT ...
FROM __THIS__ GROUP BY
+ ...` operation on a table in streaming mode), this operator would
aggregate all changelogs and
+ only output the final state.
+ """
+
+ def __init__(self, java_model=None):
+ super(SQLTransformer, self).__init__(java_model)
+
+ @classmethod
+ def _java_transformer_package_name(cls) -> str:
+ return "sqltransformer"
+
+ @classmethod
+ def _java_transformer_class_name(cls) -> str:
+ return "SQLTransformer"
diff --git
a/flink-ml-python/pyflink/ml/lib/feature/tests/test_sqltransformer.py
b/flink-ml-python/pyflink/ml/lib/feature/tests/test_sqltransformer.py
new file mode 100644
index 0000000..90470a7
--- /dev/null
+++ b/flink-ml-python/pyflink/ml/lib/feature/tests/test_sqltransformer.py
@@ -0,0 +1,65 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+from pyflink.common import Types, Row
+
+from pyflink.ml.lib.feature.sqltransformer import SQLTransformer
+from pyflink.ml.tests.test_utils import PyFlinkMLTestCase
+
+
+class SQLTransformerTest(PyFlinkMLTestCase):
+ def setUp(self):
+ super(SQLTransformerTest, self).setUp()
+ self.input_table = self.t_env.from_data_stream(
+ self.env.from_collection([
+ (0, 1.0, 3.0),
+ (2, 2.0, 5.0),
+ ],
+ type_info=Types.ROW_NAMED(
+ ['id', 'v1', 'v2'],
+ [Types.INT(), Types.DOUBLE(), Types.DOUBLE()])))
+ self.expected_output = [
+ (0, 1.0, 3.0, 4.0, 3.0),
+ (2, 2.0, 5.0, 7.0, 10.0)
+ ]
+
+ def test_param(self):
+ sql_transformer = SQLTransformer()
+ sql_transformer.set_statement('SELECT * FROM __THIS__')
+ self.assertEqual('SELECT * FROM __THIS__', sql_transformer.statement)
+
+ def test_output_schema(self):
+ sql_transformer = SQLTransformer() \
+ .set_statement('SELECT *, (v1 + v2) AS v3, (v1 * v2) AS v4 FROM
__THIS__')
+ output_table = sql_transformer.transform(self.input_table)[0]
+
+ self.assertEqual(
+ ['id', 'v1', 'v2', 'v3', 'v4'],
+ output_table.get_schema().get_field_names())
+
+ def test_save_load_transform(self):
+ sql_transformer = SQLTransformer() \
+ .set_statement('SELECT *, (v1 + v2) AS v3, (v1 * v2) AS v4 FROM
__THIS__')
+ loaded_sql_transformer = self.save_and_reload(sql_transformer)
+ output_table = loaded_sql_transformer.transform(self.input_table)[0]
+ actual_output = [output for output in
+
self.t_env.to_data_stream(output_table).execute_and_collect()]
+ actual_output.sort(key=lambda x: x[0])
+ self.assertEqual(len(self.expected_output), len(actual_output))
+ for i in range(len(actual_output)):
+ self.assertEqual(Row(*self.expected_output[i]), actual_output[i])