This is an automated email from the ASF dual-hosted git repository.
lindong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-ml.git
The following commit(s) were added to refs/heads/master by this push:
new 0b6b7f70 [FLINK-32292] Fix TableUtils.getRowTypeInfo when the input
contains Tuple
0b6b7f70 is described below
commit 0b6b7f70e45bebfa5f66e9405f152031607bc45a
Author: Zhipeng Zhang <[email protected]>
AuthorDate: Fri Jun 9 10:15:12 2023 +0800
[FLINK-32292] Fix TableUtils.getRowTypeInfo when the input contains Tuple
This closes #241.
---
.../java/org/apache/flink/ml/common/datastream/TableUtils.java | 1 +
.../java/org/apache/flink/ml/common/datastream/TableUtilsTest.java | 7 +++++++
2 files changed, 8 insertions(+)
diff --git
a/flink-ml-core/src/main/java/org/apache/flink/ml/common/datastream/TableUtils.java
b/flink-ml-core/src/main/java/org/apache/flink/ml/common/datastream/TableUtils.java
index 8d278502..8af46424 100644
---
a/flink-ml-core/src/main/java/org/apache/flink/ml/common/datastream/TableUtils.java
+++
b/flink-ml-core/src/main/java/org/apache/flink/ml/common/datastream/TableUtils.java
@@ -60,6 +60,7 @@ public class TableUtils {
LOGICAL_TYPE_ROOTS_USING_EXTERNAL_TYPE_INFO.add(LogicalTypeRoot.MAP);
LOGICAL_TYPE_ROOTS_USING_EXTERNAL_TYPE_INFO.add(LogicalTypeRoot.MULTISET);
LOGICAL_TYPE_ROOTS_USING_EXTERNAL_TYPE_INFO.add(LogicalTypeRoot.ROW);
+
LOGICAL_TYPE_ROOTS_USING_EXTERNAL_TYPE_INFO.add(LogicalTypeRoot.STRUCTURED_TYPE);
}
// Constructs a RowTypeInfo from the given schema. Currently, this
function does not support
diff --git
a/flink-ml-core/src/test/java/org/apache/flink/ml/common/datastream/TableUtilsTest.java
b/flink-ml-core/src/test/java/org/apache/flink/ml/common/datastream/TableUtilsTest.java
index b420ea4d..e357d80c 100644
---
a/flink-ml-core/src/test/java/org/apache/flink/ml/common/datastream/TableUtilsTest.java
+++
b/flink-ml-core/src/test/java/org/apache/flink/ml/common/datastream/TableUtilsTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.ml.common.datastream;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.ml.linalg.DenseMatrix;
import org.apache.flink.ml.linalg.DenseVector;
@@ -119,6 +120,12 @@ public class TableUtilsTest {
dataFields.add(new SparseVector(2, new int[] {0}, new double[] {0.1}));
preDefinedDataTypes.add(DataTypes.RAW(DenseMatrixTypeInfo.INSTANCE));
dataFields.add(new DenseMatrix(2, 2));
+ preDefinedDataTypes.add(
+ DataTypes.STRUCTURED(
+ Tuple2.class,
+ DataTypes.FIELD("f0", DataTypes.BIGINT()),
+ DataTypes.FIELD("f1", DataTypes.BIGINT())));
+ dataFields.add(Tuple2.of(1L, 2L));
Schema.Builder builder = Schema.newBuilder();
for (int i = 0; i < preDefinedDataTypes.size(); i++) {