jiangxin369 commented on code in PR #196:
URL: https://github.com/apache/flink-ml/pull/196#discussion_r1068221091


##########
flink-ml-lib/src/test/java/org/apache/flink/ml/feature/OnlineStandardScalerTest.java:
##########
@@ -0,0 +1,447 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.feature;
+
+import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.ml.common.window.CountTumblingWindows;
+import org.apache.flink.ml.common.window.EventTimeTumblingWindows;
+import org.apache.flink.ml.common.window.GlobalWindows;
+import org.apache.flink.ml.common.window.ProcessingTimeTumblingWindows;
+import org.apache.flink.ml.feature.standardscaler.OnlineStandardScaler;
+import org.apache.flink.ml.feature.standardscaler.OnlineStandardScalerModel;
+import org.apache.flink.ml.feature.standardscaler.StandardScalerModelData;
+import org.apache.flink.ml.linalg.DenseVector;
+import org.apache.flink.ml.linalg.Vector;
+import org.apache.flink.ml.linalg.Vectors;
+import org.apache.flink.ml.linalg.typeinfo.DenseVectorTypeInfo;
+import org.apache.flink.ml.util.ReadWriteUtils;
+import org.apache.flink.ml.util.TestUtils;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import 
org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.test.util.AbstractTestBase;
+import org.apache.flink.types.Row;
+
+import org.apache.commons.collections.IteratorUtils;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+/** Tests {@link OnlineStandardScaler} and {@link OnlineStandardScalerModel}. 
*/
+public class OnlineStandardScalerTest extends AbstractTestBase {
+    @Rule public final TemporaryFolder tempFolder = new TemporaryFolder();
+    private StreamExecutionEnvironment env;
+    private StreamTableEnvironment tEnv;
+    private final List<Row> inputData =
+            Arrays.asList(
+                    Row.of(0L, Vectors.dense(-2.5, 9, 1)),
+                    Row.of(1000L, Vectors.dense(1.4, -5, 1)),
+                    Row.of(2000L, Vectors.dense(2, -1, -2)),
+                    Row.of(6000L, Vectors.dense(0.7, 3, 1)),
+                    Row.of(7000L, Vectors.dense(0, 1, 1)),
+                    Row.of(8000L, Vectors.dense(0.5, 0, -2)),
+                    Row.of(9000L, Vectors.dense(0.4, 1, 1)),
+                    Row.of(10000L, Vectors.dense(0.3, 2, 1)),
+                    Row.of(11000L, Vectors.dense(0.5, 1, -2)));
+
+    private final List<StandardScalerModelData> expectedModelData =
+            Arrays.asList(
+                    new StandardScalerModelData(
+                            Vectors.dense(0.3, 1, 0),
+                            Vectors.dense(2.4433583, 7.2111026, 1.7320508),
+                            0L,
+                            2999L),
+                    new StandardScalerModelData(
+                            Vectors.dense(0.35, 1.1666667, 0),
+                            Vectors.dense(1.5630099, 4.6654760, 1.5491933),
+                            1L,
+                            8999L),
+                    new StandardScalerModelData(
+                            Vectors.dense(0.3666667, 1.2222222, 0),
+                            Vectors.dense(1.2369316, 3.7006005, 1.5),
+                            2L,
+                            11999L));
+
+    private static final double TOLERANCE = 1e-7;
+
+    private Table inputTable;
+
+    private Table inputTableWithProcessingTime;
+
+    private Table inputTableWithEventTime;
+
+    @Before
+    public void before() {
+        Configuration config = new Configuration();
+        
config.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, 
true);
+        env = StreamExecutionEnvironment.getExecutionEnvironment(config);
+        env.getConfig().enableObjectReuse();
+        env.setParallelism(4);
+        env.enableCheckpointing(100);
+        env.setRestartStrategy(RestartStrategies.noRestart());
+        tEnv = StreamTableEnvironment.create(env);
+
+        DataStream<Row> inputStream = env.fromCollection(inputData);
+        inputTable =
+                tEnv.fromDataStream(
+                                inputStream,
+                                Schema.newBuilder()
+                                        .column("f0", DataTypes.BIGINT())
+                                        .column("f1", 
DataTypes.RAW(DenseVectorTypeInfo.INSTANCE))
+                                        .build())
+                        .as("id", "input");
+
+        DataStream<Row> inputStreamWithProcessingTimeGap =
+                inputStream
+                        .map(
+                                new MapFunction<Row, Row>() {
+                                    private int count = 0;
+
+                                    @Override
+                                    public Row map(Row value) throws Exception 
{
+                                        count++;
+                                        if (count % 3 == 0) {
+                                            Thread.sleep(1000);
+                                        }
+                                        return value;
+                                    }
+                                },
+                                new RowTypeInfo(
+                                        new TypeInformation[] {
+                                            Types.LONG, 
DenseVectorTypeInfo.INSTANCE
+                                        },
+                                        new String[] {"id", "input"}))
+                        .setParallelism(1);
+
+        inputTableWithProcessingTime = 
tEnv.fromDataStream(inputStreamWithProcessingTimeGap);
+
+        DataStream<Row> inputStreamWithEventTime =
+                inputStream.assignTimestampsAndWatermarks(
+                        WatermarkStrategy.<Row>forMonotonousTimestamps()
+                                .withTimestampAssigner(
+                                        (SerializableTimestampAssigner<Row>)
+                                                (element, recordTimestamp) ->
+                                                        
element.getFieldAs(0)));
+        inputTableWithEventTime =
+                tEnv.fromDataStream(
+                                inputStreamWithEventTime,
+                                Schema.newBuilder()
+                                        .column("f0", DataTypes.BIGINT())
+                                        .column("f1", 
DataTypes.RAW(DenseVectorTypeInfo.INSTANCE))
+                                        .columnByMetadata("rowtime", 
"TIMESTAMP_LTZ(3)")
+                                        .watermark("rowtime", 
"SOURCE_WATERMARK()")
+                                        .build())
+                        .as("id", "input");
+    }
+
+    @Test
+    public void testParam() {
+        OnlineStandardScaler standardScaler = new OnlineStandardScaler();
+
+        assertEquals("input", standardScaler.getInputCol());
+        assertEquals(false, standardScaler.getWithMean());
+        assertEquals(true, standardScaler.getWithStd());
+        assertEquals("output", standardScaler.getOutputCol());
+        assertNull(standardScaler.getModelVersionCol());
+        assertEquals(GlobalWindows.getInstance(), standardScaler.getWindows());
+        assertEquals(0L, standardScaler.getMaxAllowedModelDelayMs());
+
+        standardScaler
+                .setInputCol("test_input")
+                .setWithMean(true)
+                .setWithStd(false)
+                .setOutputCol("test_output")
+                .setModelVersionCol("model_version_col")
+                
.setWindows(EventTimeTumblingWindows.of(Time.milliseconds(3000)))
+                .setMaxAllowedModelDelayMs(3000L);
+
+        assertEquals("test_input", standardScaler.getInputCol());
+        assertEquals(true, standardScaler.getWithMean());
+        assertEquals(false, standardScaler.getWithStd());
+        assertEquals("test_output", standardScaler.getOutputCol());
+        assertEquals("model_version_col", standardScaler.getModelVersionCol());
+        assertEquals(
+                EventTimeTumblingWindows.of(Time.milliseconds(3000)), 
standardScaler.getWindows());
+        assertEquals(3000L, standardScaler.getMaxAllowedModelDelayMs());
+    }
+
+    @Test
+    public void testOutputSchema() {
+        Table renamedTable = inputTable.as("test_id", "test_input");
+        OnlineStandardScaler standardScaler =
+                new OnlineStandardScaler()
+                        .setInputCol("test_input")
+                        .setOutputCol("test_output")
+                        .setModelVersionCol("model_version_col");

Review Comment:
   Could you also test the output schema without `setModelVersionCol`? So as in 
pytest.



##########
docs/content/docs/operators/feature/onlinestandardscaler.md:
##########
@@ -0,0 +1,261 @@
+---
+title: "OnlineStandardScaler"
+weight: 1
+type: docs
+aliases:
+- /operators/feature/onlinestandardscaler.html
+---
+
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+## OnlineStandardScaler
+
+An Estimator which implements the online standard scaling algorithm, which 
+is the online version of StandardScaler.
+
+OnlineStandardScaler splits the input data by the user-specified window 
strategy.
+For each window, it computes the mean and standard deviation using the data 
seen
+so far (i.e., not only the data in the current window, but also the history 
data).
+The model data generated by OnlineStandardScaler is a model stream. 
+There is one model data for each window.
+
+During the inference phase (i.e., using OnlineStandardScalerModel for 
prediction),
+users could output the model version that is used for predicting each data 
point. 
+Moreover,
+- When the train data and test data both contain event time, users could 
+specify the maximum difference between timestamp of the input and model data,

Review Comment:
   ```suggestion
   specify the maximum difference between the timestamps of the input and model 
data,
   ```
   There are still several same comments not addressed



##########
flink-ml-lib/src/test/java/org/apache/flink/ml/feature/OnlineStandardScalerTest.java:
##########
@@ -0,0 +1,396 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.feature;
+
+import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.ml.common.window.CountTumblingWindows;
+import org.apache.flink.ml.common.window.EventTimeTumblingWindows;
+import org.apache.flink.ml.common.window.GlobalWindows;
+import org.apache.flink.ml.feature.standardscaler.OnlineStandardScaler;
+import org.apache.flink.ml.feature.standardscaler.OnlineStandardScalerModel;
+import org.apache.flink.ml.feature.standardscaler.StandardScalerModelData;
+import org.apache.flink.ml.linalg.DenseVector;
+import org.apache.flink.ml.linalg.Vector;
+import org.apache.flink.ml.linalg.Vectors;
+import org.apache.flink.ml.linalg.typeinfo.DenseVectorTypeInfo;
+import org.apache.flink.ml.util.ReadWriteUtils;
+import org.apache.flink.ml.util.TestUtils;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import 
org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.test.util.AbstractTestBase;
+import org.apache.flink.types.Row;
+
+import org.apache.commons.collections.IteratorUtils;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+/** Tests {@link OnlineStandardScaler} and {@link OnlineStandardScalerModel}. 
*/
+public class OnlineStandardScalerTest extends AbstractTestBase {
+    @Rule public final TemporaryFolder tempFolder = new TemporaryFolder();
+    private StreamExecutionEnvironment env;
+    private StreamTableEnvironment tEnv;
+    private final List<Row> inputData =
+            Arrays.asList(
+                    Row.of(0L, Vectors.dense(-2.5, 9, 1)),
+                    Row.of(1000L, Vectors.dense(1.4, -5, 1)),
+                    Row.of(2000L, Vectors.dense(2, -1, -2)),
+                    Row.of(6000L, Vectors.dense(0.7, 3, 1)),
+                    Row.of(7000L, Vectors.dense(0, 1, 1)),
+                    Row.of(8000L, Vectors.dense(0.5, 0, -2)),
+                    Row.of(9000L, Vectors.dense(0.4, 1, 1)),
+                    Row.of(10000L, Vectors.dense(0.3, 2, 1)),
+                    Row.of(11000L, Vectors.dense(0.5, 1, -2)));
+
+    private final List<StandardScalerModelData> expectedModelData =
+            Arrays.asList(
+                    new StandardScalerModelData(
+                            Vectors.dense(0.3, 1, 0),
+                            Vectors.dense(2.4433583, 7.2111026, 1.7320508),
+                            0L,
+                            2999L),
+                    new StandardScalerModelData(
+                            Vectors.dense(0.35, 1.1666667, 0),
+                            Vectors.dense(1.5630099, 4.6654760, 1.5491933),
+                            1L,
+                            8999L),
+                    new StandardScalerModelData(
+                            Vectors.dense(0.3666667, 1.2222222, 0),
+                            Vectors.dense(1.2369316, 3.7006005, 1.5),
+                            2L,
+                            11999L));
+
+    private static final double TOLERANCE = 1e-7;
+
+    private Table inputTable;
+
+    private Table inputTableWithEventTime;
+
+    @Before
+    public void before() {
+        Configuration config = new Configuration();
+        
config.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, 
true);
+        env = StreamExecutionEnvironment.getExecutionEnvironment(config);
+        env.getConfig().enableObjectReuse();
+        env.setParallelism(4);
+        env.enableCheckpointing(100);
+        env.setRestartStrategy(RestartStrategies.noRestart());
+        tEnv = StreamTableEnvironment.create(env);
+
+        DataStream<Row> inputStream = env.fromCollection(inputData);
+        inputTable =
+                tEnv.fromDataStream(
+                                inputStream,
+                                Schema.newBuilder()
+                                        .column("f0", DataTypes.BIGINT())
+                                        .column("f1", 
DataTypes.RAW(DenseVectorTypeInfo.INSTANCE))
+                                        .build())
+                        .as("id", "input");
+
+        DataStream<Row> inputStreamWithEventTime =
+                inputStream.assignTimestampsAndWatermarks(
+                        WatermarkStrategy.<Row>forMonotonousTimestamps()
+                                .withTimestampAssigner(
+                                        (SerializableTimestampAssigner<Row>)
+                                                (element, recordTimestamp) ->
+                                                        
element.getFieldAs(0)));
+        inputTableWithEventTime =
+                tEnv.fromDataStream(
+                                inputStreamWithEventTime,
+                                Schema.newBuilder()
+                                        .column("f0", DataTypes.BIGINT())
+                                        .column("f1", 
DataTypes.RAW(DenseVectorTypeInfo.INSTANCE))
+                                        .columnByMetadata("rowtime", 
"TIMESTAMP_LTZ(3)")
+                                        .watermark("rowtime", 
"SOURCE_WATERMARK()")
+                                        .build())
+                        .as("id", "input");
+    }
+
+    @Test
+    public void testParam() {
+        OnlineStandardScaler standardScaler = new OnlineStandardScaler();
+
+        assertEquals("input", standardScaler.getInputCol());
+        assertEquals(false, standardScaler.getWithMean());
+        assertEquals(true, standardScaler.getWithStd());
+        assertEquals("output", standardScaler.getOutputCol());
+        assertNull(standardScaler.getModelVersionCol());
+        assertEquals(GlobalWindows.getInstance(), standardScaler.getWindows());
+        assertEquals(0L, standardScaler.getMaxAllowedModelDelayMs());
+
+        standardScaler
+                .setInputCol("test_input")
+                .setWithMean(true)
+                .setWithStd(false)
+                .setOutputCol("test_output")
+                .setModelVersionCol("model_version_col")
+                
.setWindows(EventTimeTumblingWindows.of(Time.milliseconds(3000)))
+                .setMaxAllowedModelDelayMs(3000L);
+
+        assertEquals("test_input", standardScaler.getInputCol());
+        assertEquals(true, standardScaler.getWithMean());
+        assertEquals(false, standardScaler.getWithStd());
+        assertEquals("test_output", standardScaler.getOutputCol());
+        assertEquals("model_version_col", standardScaler.getModelVersionCol());
+        assertEquals(
+                EventTimeTumblingWindows.of(Time.milliseconds(3000)), 
standardScaler.getWindows());
+        assertEquals(3000L, standardScaler.getMaxAllowedModelDelayMs());
+    }
+
+    @Test
+    public void testOutputSchema() {
+        Table renamedTable = inputTable.as("test_id", "test_input");
+        OnlineStandardScaler standardScaler =
+                new OnlineStandardScaler()
+                        .setInputCol("test_input")
+                        .setOutputCol("test_output")
+                        .setModelVersionCol("model_version_col");
+        Table output = 
standardScaler.fit(renamedTable).transform(renamedTable)[0];
+
+        assertEquals(
+                Arrays.asList("test_id", "test_input", "test_output", 
"model_version_col"),
+                output.getResolvedSchema().getColumnNames());
+    }
+
+    @Test
+    public void testFitAndPredictWithEventTimeWindow() throws Exception {
+        OnlineStandardScaler standardScaler = new OnlineStandardScaler();
+        Table output;
+        int windowSizeMs = 3000;
+
+        // Tests event time window with maxAllowedModelDelayMs as 0.
+        standardScaler
+                
.setWindows(EventTimeTumblingWindows.of(Time.milliseconds(windowSizeMs)))
+                .setModelVersionCol("modelVersionCol");
+        output = 
standardScaler.fit(inputTableWithEventTime).transform(inputTableWithEventTime)[0];
+        verifyUsedModelVersion(

Review Comment:
   I understand, thanks for the explanation.



##########
docs/content/docs/operators/feature/onlinestandardscaler.md:
##########
@@ -0,0 +1,261 @@
+---
+title: "OnlineStandardScaler"
+weight: 1
+type: docs
+aliases:
+- /operators/feature/onlinestandardscaler.html
+---
+
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+## OnlineStandardScaler
+
+An Estimator which implements the online standard scaling algorithm, which 
+is the online version of StandardScaler.
+
+OnlineStandardScaler splits the input data by the user-specified window 
strategy.
+For each window, it computes the mean and standard deviation using the data 
seen
+so far (i.e., not only the data in the current window, but also the history 
data).
+The model data generated by OnlineStandardScaler is a model stream. 
+There is one model data for each window.
+
+During the inference phase (i.e., using OnlineStandardScalerModel for 
prediction),
+users could output the model version that is used for predicting each data 
point. 
+Moreover,
+- When the train data and test data both contains event time, users could 
+specify the maximum difference between timestamp of the input and model data,
+which enforces to use a relatively fresh model for prediction.
+- Otherwise, the prediction process always use the current model data for 
prediction.
+
+
+### Input Columns
+
+| Param name | Type   | Default   | Description            |
+|:-----------|:-------|:----------|:-----------------------|
+| inputCol   | Vector | `"input"` | Features to be scaled. |
+
+### Output Columns
+
+| Param name | Type   | Default    | Description      |
+|:-----------|:-------|:-----------|:-----------------|
+| outputCol  | Vector | `"output"` | Scaled features. |
+
+### Parameters
+
+Below are the parameters required by `OnlineStandardScalerModel`.
+
+| Key                    | Default    | Type    | Required | Description       
                                                                                
                                                                                
             |
+|------------------------|------------|---------|----------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| inputCol               | `"input"`  | String  | no       | Input column 
name.                                                                           
                                                                                
                  |
+| outputCol              | `"output"` | String  | no       | Output column 
name.                                                                           
                                                                                
                 |
+| withMean               | `false`    | Boolean | no       | Whether centers 
the data with mean before scaling.                                              
                                                                                
               |
+| withStd                | `true`     | Boolean | no       | Whether scales 
the data with standard deviation.                                               
                                                                                
                |
+ | modelVersionCol        | `null`     | String  | no       | The version of 
the model data that the input data is predicted with.                           
                                                                                
                |
+ | maxAllowedModelDelayMs | `0L`       | Long    | no       | The maximum 
difference between timestamp of the input record and model data when using the 
model data to predict that input record. This param only works when the input 
contains event time." |
+
+`OnlineStandardScaler` needs parameters above and also below.
+
+| Key     | Default                       | Type    | Required | Description   
                                                                 |
+|---------|-------------------------------|---------|----------|--------------------------------------------------------------------------------|
+| windows | `GlobalWindows.getInstance()` | Windows | no       | Windowing 
strategy that determines how to create mini-batches from input data. |
+
+
+### Examples
+
+{{< tabs examples >}}
+
+{{< tab "Java">}}
+
+```java
+import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.ml.common.window.EventTimeTumblingWindows;
+import org.apache.flink.ml.feature.standardscaler.OnlineStandardScaler;
+import org.apache.flink.ml.feature.standardscaler.OnlineStandardScalerModel;
+import org.apache.flink.ml.linalg.DenseVector;
+import org.apache.flink.ml.linalg.Vectors;
+import org.apache.flink.ml.linalg.typeinfo.DenseVectorTypeInfo;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CloseableIterator;
+
+import java.util.Arrays;
+import java.util.List;
+
+/** Simple program that trains a OnlineStandardScaler model and uses it for 
feature engineering. */
+public class OnlineStandardScalerExample {
+       public static void main(String[] args) {
+               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+               StreamTableEnvironment tEnv = 
StreamTableEnvironment.create(env);
+
+               // Generates input data.
+               List<Row> inputData =
+                       Arrays.asList(
+                               Row.of(0L, Vectors.dense(-2.5, 9, 1)),
+                               Row.of(1000L, Vectors.dense(1.4, -5, 1)),
+                               Row.of(2000L, Vectors.dense(2, -1, -2)),
+                               Row.of(6000L, Vectors.dense(0.7, 3, 1)),
+                               Row.of(7000L, Vectors.dense(0, 1, 1)),
+                               Row.of(8000L, Vectors.dense(0.5, 0, -2)),
+                               Row.of(9000L, Vectors.dense(0.4, 1, 1)),
+                               Row.of(10000L, Vectors.dense(0.3, 2, 1)),
+                               Row.of(11000L, Vectors.dense(0.5, 1, -2)));
+
+               DataStream<Row> inputStream = env.fromCollection(inputData);
+
+               DataStream<Row> inputStreamWithEventTime =
+                       inputStream.assignTimestampsAndWatermarks(
+                               WatermarkStrategy.<Row>forMonotonousTimestamps()
+                                       .withTimestampAssigner(
+                                               
(SerializableTimestampAssigner<Row>)
+                                                       (element, 
recordTimestamp) ->
+                                                               
element.getFieldAs(0)));
+
+               Table inputTable =
+                       tEnv.fromDataStream(
+                                       inputStreamWithEventTime,
+                                       Schema.newBuilder()
+                                               .column("f0", 
DataTypes.BIGINT())
+                                               .column("f1", 
DataTypes.RAW(DenseVectorTypeInfo.INSTANCE))
+                                               .columnByMetadata("rowtime", 
"TIMESTAMP_LTZ(3)")
+                                               .watermark("rowtime", 
"SOURCE_WATERMARK()")
+                                               .build())
+                               .as("id", "input");
+
+               // Creates an OnlineStandardScaler object and initializes its 
parameters.
+               long windowSizeMs = 3000;
+               OnlineStandardScaler onlineStandardScaler =
+                       new OnlineStandardScaler()
+                               
.setWindows(EventTimeTumblingWindows.of(Time.milliseconds(windowSizeMs)))
+                               .setModelVersionCol("modelVersionCol");
+
+               // Trains the OnlineStandardScaler Model.
+               OnlineStandardScalerModel model = 
onlineStandardScaler.fit(inputTable);
+
+               // Uses the OnlineStandardScaler Model for predictions.
+               Table outputTable = model.transform(inputTable)[0];
+
+               // Extracts and displays the results.
+               for (CloseableIterator<Row> it = 
outputTable.execute().collect(); it.hasNext(); ) {
+                       Row row = it.next();
+                       DenseVector inputValue = (DenseVector) 
row.getField(onlineStandardScaler.getInputCol());
+                       DenseVector outputValue =
+                               (DenseVector) 
row.getField(onlineStandardScaler.getOutputCol());
+                       long modelVersion = 
row.getFieldAs(onlineStandardScaler.getModelVersionCol());
+                       System.out.printf(
+                               "Input Value: %s\tOutput Value: %s\tModel 
Version: %s\n",
+                               inputValue, outputValue, modelVersion);
+               }
+       }
+}
+

Review Comment:
   Is this addressed?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to