This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 45ba3035d00f890540967693d9fd9e43b8b5e0e4
Author: Chesnay Schepler <[email protected]>
AuthorDate: Mon Mar 21 19:37:37 2022 +0100

    [FLINK-26842][python][tests] Port Scala code to Java
---
 flink-python/pom.xml                               |  18 --
 .../pyflink/table/tests/test_descriptor.py         |   4 +-
 flink-python/pyflink/testing/test_case_utils.py    |   4 +-
 .../table/legacyutils/ByteMaxAggFunction.java      |  76 +++++
 .../flink/table/legacyutils/CustomAssigner.java    |  32 +++
 .../flink/table/legacyutils/CustomExtractor.java   |  69 +++++
 .../flink/table/legacyutils/MaxAccumulator.java    |  25 ++
 .../apache/flink/table/legacyutils/RichFunc0.java  |  69 +++++
 .../flink/table/legacyutils/RowCollector.java      |  79 ++++++
 .../apache/flink/table/legacyutils/RowSink.java    |  32 +++
 .../apache/flink/table/legacyutils/TableFunc1.java |  42 +++
 .../flink/table/legacyutils/TestAppendSink.java    |  73 +++++
 .../legacyutils/TestCollectionTableFactory.java    | 307 +++++++++++++++++++++
 .../flink/table/legacyutils/TestRetractSink.java   |  63 +++++
 .../flink/table/legacyutils/TestUpsertSink.java    |  87 ++++++
 .../legacyutils/TestCollectionTableFactory.scala   | 253 -----------------
 .../legacyutils/legacyTestingDescriptors.scala     |  80 ------
 .../table/legacyutils/legacyTestingFunctions.scala | 152 ----------
 .../table/legacyutils/legacyTestingSinks.scala     | 200 --------------
 19 files changed, 958 insertions(+), 707 deletions(-)

diff --git a/flink-python/pom.xml b/flink-python/pom.xml
index 4204436..46371a4 100644
--- a/flink-python/pom.xml
+++ b/flink-python/pom.xml
@@ -470,24 +470,6 @@ under the License.
                                        </execution>
                                </executions>
                        </plugin>
-                       <!-- This is only a temporary solution until 
FLINK-22872 is fixed. -->
-                       <!-- It compiles `org.apache.flink.table.legacyutils` 
containing code from the old planner. -->
-                       <!-- We should not start adding more Scala code. Please 
remove this as soon as possible. -->
-                       <plugin>
-                               <groupId>net.alchim31.maven</groupId>
-                               <artifactId>scala-maven-plugin</artifactId>
-                               <executions>
-                                       <!-- Run Scala compiler in the 
process-test-resources phase, so that dependencies on
-                                                Scala classes can be resolved 
later in the (Java) test-compile phase -->
-                                       <execution>
-                                               <id>scala-test-compile</id>
-                                               
<phase>process-test-resources</phase>
-                                               <goals>
-                                                       <goal>testCompile</goal>
-                                               </goals>
-                                       </execution>
-                               </executions>
-                       </plugin>
                </plugins>
        </build>
 </project>
diff --git a/flink-python/pyflink/table/tests/test_descriptor.py 
b/flink-python/pyflink/table/tests/test_descriptor.py
index c973d9c..e63429b 100644
--- a/flink-python/pyflink/table/tests/test_descriptor.py
+++ b/flink-python/pyflink/table/tests/test_descriptor.py
@@ -50,8 +50,8 @@ class RowTimeDescriptorTests(PyFlinkTestCase):
             'rowtime.timestamps.class':
                 'org.apache.flink.table.legacyutils.CustomExtractor',
             'rowtime.timestamps.serialized':
-                
'rO0ABXNyADJvcmcuYXBhY2hlLmZsaW5rLnRhYmxlLmxlZ2FjeXV0aWxzLkN1c3RvbUV4dHJhY3Rvctj'
-                
'ZLTGK9XvxAgABTAAFZmllbGR0ABJMamF2YS9sYW5nL1N0cmluZzt4cgA-b3JnLmFwYWNoZS5mbGluay'
+                
'rO0ABXNyADJvcmcuYXBhY2hlLmZsaW5rLnRhYmxlLmxlZ2FjeXV0aWxzLkN1c3RvbUV4dHJhY3Rvcl4'
+                
'ozwVLIwG6AgABTAAFZmllbGR0ABJMamF2YS9sYW5nL1N0cmluZzt4cgA-b3JnLmFwYWNoZS5mbGluay'
                 
'50YWJsZS5zb3VyY2VzLnRzZXh0cmFjdG9ycy5UaW1lc3RhbXBFeHRyYWN0b3Jf1Y6piFNsGAIAAHhwd'
                 'AACdHM'}
         self.assertEqual(expected, properties)
diff --git a/flink-python/pyflink/testing/test_case_utils.py 
b/flink-python/pyflink/testing/test_case_utils.py
index c8749648..8abe31f 100644
--- a/flink-python/pyflink/testing/test_case_utils.py
+++ b/flink-python/pyflink/testing/test_case_utils.py
@@ -107,8 +107,8 @@ class PyFlinkTestCase(unittest.TestCase):
     @classmethod
     def to_py_list(cls, actual):
         py_list = []
-        for i in range(0, actual.length()):
-            py_list.append(actual.apply(i))
+        for i in range(0, actual.size()):
+            py_list.append(actual.get(i))
         return py_list
 
     @classmethod
diff --git 
a/flink-python/src/test/java/org/apache/flink/table/legacyutils/ByteMaxAggFunction.java
 
b/flink-python/src/test/java/org/apache/flink/table/legacyutils/ByteMaxAggFunction.java
new file mode 100644
index 0000000..ad9ebb4
--- /dev/null
+++ 
b/flink-python/src/test/java/org/apache/flink/table/legacyutils/ByteMaxAggFunction.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.legacyutils;
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.table.functions.AggregateFunction;
+
+/** {@link AggregateFunction} for {@link Byte}. */
+public class ByteMaxAggFunction extends AggregateFunction<Byte, 
MaxAccumulator<Byte>> {
+
+    private static final long serialVersionUID = 1233840393767061909L;
+
+    @Override
+    public MaxAccumulator<Byte> createAccumulator() {
+        final MaxAccumulator<Byte> acc = new MaxAccumulator<>();
+        resetAccumulator(acc);
+        return acc;
+    }
+
+    public void accumulate(MaxAccumulator<Byte> acc, Byte value) {
+        if (value != null) {
+            if (!acc.f1 || Byte.compare(acc.f0, value) < 0) {
+                acc.f0 = value;
+                acc.f1 = true;
+            }
+        }
+    }
+
+    @Override
+    public Byte getValue(MaxAccumulator<Byte> acc) {
+        if (acc.f1) {
+            return acc.f0;
+        } else {
+            return null;
+        }
+    }
+
+    public void merge(MaxAccumulator<Byte> acc, Iterable<MaxAccumulator<Byte>> 
its) {
+        its.forEach(
+                a -> {
+                    if (a.f1) {
+                        accumulate(acc, a.f0);
+                    }
+                });
+    }
+
+    public void resetAccumulator(MaxAccumulator<Byte> acc) {
+        acc.f0 = 0;
+        acc.f1 = false;
+    }
+
+    @Override
+    public TypeInformation<MaxAccumulator<Byte>> getAccumulatorType() {
+        return new TupleTypeInfo(
+                MaxAccumulator.class,
+                BasicTypeInfo.BYTE_TYPE_INFO,
+                BasicTypeInfo.BOOLEAN_TYPE_INFO);
+    }
+}
diff --git 
a/flink-python/src/test/java/org/apache/flink/table/legacyutils/CustomAssigner.java
 
b/flink-python/src/test/java/org/apache/flink/table/legacyutils/CustomAssigner.java
new file mode 100644
index 0000000..d69cce5
--- /dev/null
+++ 
b/flink-python/src/test/java/org/apache/flink/table/legacyutils/CustomAssigner.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.legacyutils;
+
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.table.sources.wmstrategies.PunctuatedWatermarkAssigner;
+import org.apache.flink.types.Row;
+
+/** A watermark assigner that throws an exception if a watermark is requested. 
*/
+public class CustomAssigner extends PunctuatedWatermarkAssigner {
+    private static final long serialVersionUID = -4900176786361416000L;
+
+    @Override
+    public Watermark getWatermark(Row row, long timestamp) {
+        throw new UnsupportedOperationException();
+    }
+}
diff --git 
a/flink-python/src/test/java/org/apache/flink/table/legacyutils/CustomExtractor.java
 
b/flink-python/src/test/java/org/apache/flink/table/legacyutils/CustomExtractor.java
new file mode 100644
index 0000000..cf66733
--- /dev/null
+++ 
b/flink-python/src/test/java/org/apache/flink/table/legacyutils/CustomExtractor.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.legacyutils;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.expressions.ApiExpressionUtils;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.expressions.FieldReferenceExpression;
+import org.apache.flink.table.expressions.ResolvedFieldReference;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.sources.tsextractors.TimestampExtractor;
+import org.apache.flink.table.types.utils.TypeConversions;
+import org.apache.flink.util.Preconditions;
+
+/** A timestamp extractor that looks for the SQL_TIMESTAMP "ts" field. */
+public class CustomExtractor extends TimestampExtractor {
+    private static final long serialVersionUID = 6784900460276023738L;
+
+    private final String field = "ts";
+
+    @Override
+    public String[] getArgumentFields() {
+        return new String[] {field};
+    }
+
+    @Override
+    public void validateArgumentFields(TypeInformation<?>[] 
argumentFieldTypes) {
+        if (argumentFieldTypes[0] != Types.SQL_TIMESTAMP) {
+            throw new ValidationException(
+                    String.format(
+                            "Field 'ts' must be of type Timestamp but is of 
type %s.",
+                            argumentFieldTypes[0]));
+        }
+    }
+
+    @Override
+    public Expression getExpression(ResolvedFieldReference[] fieldAccesses) {
+        ResolvedFieldReference fieldAccess = fieldAccesses[0];
+        Preconditions.checkArgument(fieldAccess.resultType() == 
Types.SQL_TIMESTAMP);
+        FieldReferenceExpression fieldReferenceExpr =
+                new FieldReferenceExpression(
+                        fieldAccess.name(),
+                        
TypeConversions.fromLegacyInfoToDataType(fieldAccess.resultType()),
+                        0,
+                        fieldAccess.fieldIndex());
+        return ApiExpressionUtils.unresolvedCall(
+                BuiltInFunctionDefinitions.CAST,
+                fieldReferenceExpr,
+                ApiExpressionUtils.typeLiteral(DataTypes.BIGINT()));
+    }
+}
diff --git 
a/flink-python/src/test/java/org/apache/flink/table/legacyutils/MaxAccumulator.java
 
b/flink-python/src/test/java/org/apache/flink/table/legacyutils/MaxAccumulator.java
new file mode 100644
index 0000000..191aa87
--- /dev/null
+++ 
b/flink-python/src/test/java/org/apache/flink/table/legacyutils/MaxAccumulator.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.legacyutils;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+
+/** Utility class to make working with tuples more readable. */
+public class MaxAccumulator<T> extends Tuple2<T, Boolean> {
+    private static final long serialVersionUID = 6089142148200600733L;
+}
diff --git 
a/flink-python/src/test/java/org/apache/flink/table/legacyutils/RichFunc0.java 
b/flink-python/src/test/java/org/apache/flink/table/legacyutils/RichFunc0.java
new file mode 100644
index 0000000..a757fe9
--- /dev/null
+++ 
b/flink-python/src/test/java/org/apache/flink/table/legacyutils/RichFunc0.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.legacyutils;
+
+import org.apache.flink.table.functions.FunctionContext;
+import org.apache.flink.table.functions.ScalarFunction;
+
+import org.junit.Assert;
+
+/**
+ * Testing scalar function to verify that lifecycle methods are called in the 
expected order and
+ * only once.
+ */
+public class RichFunc0 extends ScalarFunction {
+    private static final long serialVersionUID = 931156471687322386L;
+
+    private boolean openCalled = false;
+    private boolean closeCalled = false;
+
+    @Override
+    public void open(FunctionContext context) throws Exception {
+        super.open(context);
+        if (openCalled) {
+            Assert.fail("Open called more than once.");
+        } else {
+            openCalled = true;
+        }
+        if (closeCalled) {
+            Assert.fail("Close called before open.");
+        }
+    }
+
+    public void eval(int index) {
+        if (!openCalled) {
+            Assert.fail("Open was not called before eval.");
+        }
+        if (closeCalled) {
+            Assert.fail("Close called before eval.");
+        }
+    }
+
+    @Override
+    public void close() throws Exception {
+        super.close();
+        if (closeCalled) {
+            Assert.fail("Close called more than once.");
+        } else {
+            closeCalled = true;
+        }
+        if (!openCalled) {
+            Assert.fail("Open was not called before close.");
+        }
+    }
+}
diff --git 
a/flink-python/src/test/java/org/apache/flink/table/legacyutils/RowCollector.java
 
b/flink-python/src/test/java/org/apache/flink/table/legacyutils/RowCollector.java
new file mode 100644
index 0000000..9fed516
--- /dev/null
+++ 
b/flink-python/src/test/java/org/apache/flink/table/legacyutils/RowCollector.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.legacyutils;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.types.Row;
+
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+/** A collector for storing results in memory. */
+class RowCollector {
+    private static final ArrayDeque<Tuple2<Boolean, Row>> sink = new 
ArrayDeque<>();
+
+    public static void addValue(Tuple2<Boolean, Row> value) {
+        synchronized (sink) {
+            sink.add(value.copy());
+        }
+    }
+
+    public static List<Tuple2<Boolean, Row>> getAndClearValues() {
+        final ArrayList<Tuple2<Boolean, Row>> out = new ArrayList<>(sink);
+        sink.clear();
+        return out;
+    }
+
+    public static List<String> retractResults(List<Tuple2<Boolean, Row>> 
results) {
+        final Map<String, Integer> retracted =
+                results.stream()
+                        .collect(
+                                Collectors.groupingBy(
+                                        r -> r.f1.toString(),
+                                        Collectors.mapping(
+                                                r -> r.f0 ? 1 : -1,
+                                                Collectors.reducing(
+                                                        0, (left, right) -> 
left + right))));
+
+        if (retracted.values().stream().anyMatch(c -> c < 0)) {
+            throw new AssertionError("Received retracted rows which have not 
been accumulated.");
+        }
+
+        return retracted.entrySet().stream()
+                .flatMap(e -> IntStream.range(0, e.getValue()).mapToObj(i -> 
e.getKey()))
+                .collect(Collectors.toList());
+    }
+
+    public static List<String> upsertResults(List<Tuple2<Boolean, Row>> 
results, int[] keys) {
+        final HashMap<Row, String> upserted = new HashMap<>();
+        for (Tuple2<Boolean, Row> r : results) {
+            final Row key = Row.project(r.f1, keys);
+            if (r.f0) {
+                upserted.put(key, r.f1.toString());
+            } else {
+                upserted.remove(key);
+            }
+        }
+        return new ArrayList<>(upserted.values());
+    }
+}
diff --git 
a/flink-python/src/test/java/org/apache/flink/table/legacyutils/RowSink.java 
b/flink-python/src/test/java/org/apache/flink/table/legacyutils/RowSink.java
new file mode 100644
index 0000000..0051d9c
--- /dev/null
+++ b/flink-python/src/test/java/org/apache/flink/table/legacyutils/RowSink.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.legacyutils;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.types.Row;
+
+/** A sink that stores data in the {@link RowCollector}. */
+public class RowSink implements SinkFunction<Tuple2<Boolean, Row>> {
+    private static final long serialVersionUID = -7264802354440479084L;
+
+    @Override
+    public void invoke(Tuple2<Boolean, Row> value, Context context) throws 
Exception {
+        RowCollector.addValue(value);
+    }
+}
diff --git 
a/flink-python/src/test/java/org/apache/flink/table/legacyutils/TableFunc1.java 
b/flink-python/src/test/java/org/apache/flink/table/legacyutils/TableFunc1.java
new file mode 100644
index 0000000..4e34e65
--- /dev/null
+++ 
b/flink-python/src/test/java/org/apache/flink/table/legacyutils/TableFunc1.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.legacyutils;
+
+import org.apache.flink.table.functions.TableFunction;
+
+/** A function that splits strings, optionally adding a prefix. */
+public class TableFunc1 extends TableFunction<String> {
+
+    private static final long serialVersionUID = -5471603822898040617L;
+
+    public void eval(String str) {
+        if (str.contains("#")) {
+            for (String s : str.split("#")) {
+                collect(s);
+            }
+        }
+    }
+
+    public void eval(String str, String prefix) {
+        if (str.contains("#")) {
+            for (String s : str.split("#")) {
+                collect(prefix + s);
+            }
+        }
+    }
+}
diff --git 
a/flink-python/src/test/java/org/apache/flink/table/legacyutils/TestAppendSink.java
 
b/flink-python/src/test/java/org/apache/flink/table/legacyutils/TestAppendSink.java
new file mode 100644
index 0000000..cde5467
--- /dev/null
+++ 
b/flink-python/src/test/java/org/apache/flink/table/legacyutils/TestAppendSink.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.legacyutils;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import org.apache.flink.table.sinks.AppendStreamTableSink;
+import org.apache.flink.table.sinks.TableSink;
+import org.apache.flink.types.Row;
+
+/** Testing append sink. */
+public class TestAppendSink implements AppendStreamTableSink<Row> {
+
+    private String[] fNames = null;
+    private TypeInformation<?>[] fTypes = null;
+
+    @Override
+    public DataStreamSink<?> consumeDataStream(DataStream<Row> dataStream) {
+        return dataStream
+                .map(
+                        new MapFunction<Row, Tuple2<Boolean, Row>>() {
+                            private static final long serialVersionUID = 
4671583708680989488L;
+
+                            @Override
+                            public Tuple2<Boolean, Row> map(Row value) throws 
Exception {
+                                return Tuple2.of(true, value);
+                            }
+                        })
+                .addSink(new RowSink());
+    }
+
+    @Override
+    public TypeInformation<Row> getOutputType() {
+        return new RowTypeInfo(fTypes, fNames);
+    }
+
+    @Override
+    public String[] getFieldNames() {
+        return fNames;
+    }
+
+    @Override
+    public TypeInformation<?>[] getFieldTypes() {
+        return fTypes;
+    }
+
+    @Override
+    public TableSink<Row> configure(String[] fieldNames, TypeInformation<?>[] 
fieldTypes) {
+        final TestAppendSink copy = new TestAppendSink();
+        copy.fNames = fieldNames;
+        copy.fTypes = fieldTypes;
+        return copy;
+    }
+}
diff --git 
a/flink-python/src/test/java/org/apache/flink/table/legacyutils/TestCollectionTableFactory.java
 
b/flink-python/src/test/java/org/apache/flink/table/legacyutils/TestCollectionTableFactory.java
new file mode 100644
index 0000000..dfa2db4
--- /dev/null
+++ 
b/flink-python/src/test/java/org/apache/flink/table/legacyutils/TestCollectionTableFactory.java
@@ -0,0 +1,307 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.legacyutils;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.io.CollectionInputFormat;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.descriptors.ConnectorDescriptorValidator;
+import org.apache.flink.table.descriptors.DescriptorProperties;
+import org.apache.flink.table.descriptors.Schema;
+import org.apache.flink.table.factories.StreamTableSinkFactory;
+import org.apache.flink.table.factories.StreamTableSourceFactory;
+import org.apache.flink.table.functions.AsyncTableFunction;
+import org.apache.flink.table.functions.TableFunction;
+import org.apache.flink.table.sinks.AppendStreamTableSink;
+import org.apache.flink.table.sinks.StreamTableSink;
+import org.apache.flink.table.sinks.TableSink;
+import org.apache.flink.table.sources.LookupableTableSource;
+import org.apache.flink.table.sources.StreamTableSource;
+import org.apache.flink.table.sources.TableSource;
+import org.apache.flink.types.Row;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+/** Factory for the testing sinks. */
+public class TestCollectionTableFactory
+        implements StreamTableSourceFactory<Row>, StreamTableSinkFactory<Row> {
+
+    private static boolean isStreaming = true;
+
+    private static final List<Row> SOURCE_DATA = new LinkedList<>();
+    private static final List<Row> DIM_DATA = new LinkedList<>();
+    private static final List<Row> RESULT = new LinkedList<>();
+
+    private long emitIntervalMS = -1L;
+
+    @Override
+    public TableSource<Row> createTableSource(Map<String, String> properties) {
+        return getCollectionSource(properties, isStreaming);
+    }
+
+    @Override
+    public TableSink<Row> createTableSink(Map<String, String> properties) {
+        return getCollectionSink(properties);
+    }
+
+    @Override
+    public StreamTableSource<Row> createStreamTableSource(Map<String, String> 
properties) {
+        return getCollectionSource(properties, true);
+    }
+
+    @Override
+    public StreamTableSink<Row> createStreamTableSink(Map<String, String> 
properties) {
+        return getCollectionSink(properties);
+    }
+
+    private CollectionTableSource getCollectionSource(
+            Map<String, String> props, boolean isStreaming) {
+        final DescriptorProperties properties = new DescriptorProperties();
+        properties.putProperties(props);
+        final TableSchema schema = properties.getTableSchema(Schema.SCHEMA);
+        final Optional<Integer> parallelism = 
properties.getOptionalInt("parallelism");
+        return new CollectionTableSource(emitIntervalMS, schema, isStreaming, 
parallelism);
+    }
+
+    private CollectionTableSink getCollectionSink(Map<String, String> props) {
+        final DescriptorProperties properties = new DescriptorProperties();
+        properties.putProperties(props);
+        final TableSchema schema = properties.getTableSchema(Schema.SCHEMA);
+        return new CollectionTableSink((RowTypeInfo) schema.toRowType());
+    }
+
+    @Override
+    public Map<String, String> requiredContext() {
+        final HashMap<String, String> context = new HashMap<>();
+        context.put(ConnectorDescriptorValidator.CONNECTOR, "COLLECTION");
+        return context;
+    }
+
+    @Override
+    public List<String> supportedProperties() {
+        return Arrays.asList("*");
+    }
+
+    private static class CollectionTableSource
+            implements StreamTableSource<Row>, LookupableTableSource<Row> {
+        private final long emitIntervalMs;
+        private final TableSchema schema;
+        private final boolean isStreaming;
+        private final Optional<Integer> parallelism;
+        private final TypeInformation<Row> rowType;
+
+        private CollectionTableSource(
+                long emitIntervalMs,
+                TableSchema schema,
+                boolean isStreaming,
+                Optional<Integer> parallelism) {
+            this.emitIntervalMs = emitIntervalMs;
+            this.schema = schema;
+            this.isStreaming = isStreaming;
+            this.parallelism = parallelism;
+            this.rowType = schema.toRowType();
+        }
+
+        @Override
+        public boolean isBounded() {
+            return !isStreaming;
+        }
+
+        @Override
+        public DataStream<Row> getDataStream(StreamExecutionEnvironment 
streamEnv) {
+            final DataStreamSource<Row> dataStream =
+                    streamEnv.createInput(
+                            new TestCollectionInputFormat<>(
+                                    emitIntervalMs,
+                                    SOURCE_DATA,
+                                    rowType.createSerializer(new 
ExecutionConfig())),
+                            rowType);
+            if (parallelism.isPresent()) {
+                dataStream.setParallelism(parallelism.get());
+            }
+            return dataStream;
+        }
+
+        @Override
+        public TypeInformation<Row> getReturnType() {
+            return rowType;
+        }
+
+        @Override
+        public TableSchema getTableSchema() {
+            return schema;
+        }
+
+        @Override
+        public TableFunction<Row> getLookupFunction(String[] lookupKeys) {
+            final String[] schemaFieldNames = schema.getFieldNames();
+            final int[] keys =
+                    Arrays.stream(lookupKeys)
+                            .map(
+                                    k -> {
+                                        for (int x = 0; x < 
schemaFieldNames.length; x++) {
+                                            if (k.equals(schemaFieldNames[x])) 
{
+                                                return x;
+                                            }
+                                        }
+                                        throw new IllegalStateException();
+                                    })
+                            .mapToInt(i -> i)
+                            .toArray();
+
+            return new TemporalTableFetcher(DIM_DATA, keys);
+        }
+
+        @Override
+        public AsyncTableFunction<Row> getAsyncLookupFunction(String[] 
lookupKeys) {
+            return null;
+        }
+
+        @Override
+        public boolean isAsyncEnabled() {
+            return false;
+        }
+    }
+
+    private static class CollectionTableSink implements 
AppendStreamTableSink<Row> {
+        private final RowTypeInfo outputType;
+
+        private CollectionTableSink(RowTypeInfo outputType) {
+            this.outputType = outputType;
+        }
+
+        @Override
+        public RowTypeInfo getOutputType() {
+            return outputType;
+        }
+
+        @Override
+        public String[] getFieldNames() {
+            return outputType.getFieldNames();
+        }
+
+        @Override
+        public TypeInformation<?>[] getFieldTypes() {
+            return outputType.getFieldTypes();
+        }
+
+        @Override
+        public DataStreamSink<?> consumeDataStream(DataStream<Row> dataStream) 
{
+            return dataStream.addSink(new 
UnsafeMemorySinkFunction(outputType)).setParallelism(1);
+        }
+
+        @Override
+        public TableSink<Row> configure(String[] fieldNames, 
TypeInformation<?>[] fieldTypes) {
+            return this;
+        }
+    }
+
+    private static class UnsafeMemorySinkFunction extends 
RichSinkFunction<Row> {
+        private static final long serialVersionUID = -7880686562734099699L;
+
+        private final TypeInformation<Row> outputType;
+        private TypeSerializer<Row> serializer = null;
+
+        private UnsafeMemorySinkFunction(TypeInformation<Row> outputType) {
+            this.outputType = outputType;
+        }
+
+        @Override
+        public void open(Configuration parameters) throws Exception {
+            serializer = outputType.createSerializer(new ExecutionConfig());
+        }
+
+        @Override
+        public void invoke(Row row, Context context) throws Exception {
+            RESULT.add(serializer.copy(row));
+        }
+    }
+
+    private static class TestCollectionInputFormat<T> extends 
CollectionInputFormat<T> {
+
+        private static final long serialVersionUID = -3222731547793350189L;
+
+        private final long emitIntervalMs;
+
+        public TestCollectionInputFormat(
+                long emitIntervalMs, Collection<T> dataSet, TypeSerializer<T> 
serializer) {
+            super(dataSet, serializer);
+            this.emitIntervalMs = emitIntervalMs;
+        }
+
+        @Override
+        public boolean reachedEnd() throws IOException {
+            if (emitIntervalMs > 0) {
+                try {
+                    Thread.sleep(emitIntervalMs);
+                } catch (InterruptedException e) {
+                }
+            }
+            return super.reachedEnd();
+        }
+    }
+
+    private static class TemporalTableFetcher extends TableFunction<Row> {
+        private static final long serialVersionUID = 6248306950388784015L;
+
+        private final List<Row> dimData;
+        private final int[] keys;
+
+        private TemporalTableFetcher(List<Row> dimData, int[] keys) {
+            this.dimData = dimData;
+            this.keys = keys;
+        }
+
+        public void eval(Row values) {
+            for (Row data : dimData) {
+                boolean matched = true;
+                int idx = 0;
+                while (matched && idx < keys.length) {
+                    final Object dimField = data.getField(keys[idx]);
+                    final Object inputField = values.getField(idx);
+                    matched = dimField.equals(inputField);
+                    idx += 1;
+                }
+                if (matched) {
+                    // copy the row data
+                    final Row ret = new Row(data.getArity());
+                    for (int x = 0; x < data.getArity(); x++) {
+                        ret.setField(x, data.getField(x));
+                    }
+                    collect(ret);
+                }
+            }
+        }
+    }
+}
diff --git 
a/flink-python/src/test/java/org/apache/flink/table/legacyutils/TestRetractSink.java
 
b/flink-python/src/test/java/org/apache/flink/table/legacyutils/TestRetractSink.java
new file mode 100644
index 0000000..449aea7
--- /dev/null
+++ 
b/flink-python/src/test/java/org/apache/flink/table/legacyutils/TestRetractSink.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.legacyutils;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import org.apache.flink.table.sinks.RetractStreamTableSink;
+import org.apache.flink.table.sinks.TableSink;
+import org.apache.flink.types.Row;
+
+/** Testing retract sink. */
+public class TestRetractSink implements RetractStreamTableSink<Row> {
+
+    private String[] fNames = null;
+    private TypeInformation<?>[] fTypes = null;
+
+    @Override
+    public DataStreamSink<?> consumeDataStream(DataStream<Tuple2<Boolean, 
Row>> dataStream) {
+        return dataStream.addSink(new RowSink());
+    }
+
+    @Override
+    public TypeInformation<Row> getRecordType() {
+        return new RowTypeInfo(fTypes, fNames);
+    }
+
+    @Override
+    public String[] getFieldNames() {
+        return fNames;
+    }
+
+    @Override
+    public TypeInformation<?>[] getFieldTypes() {
+        return fTypes;
+    }
+
+    @Override
+    public TableSink<Tuple2<Boolean, Row>> configure(
+            String[] fieldNames, TypeInformation<?>[] fieldTypes) {
+        final TestRetractSink copy = new TestRetractSink();
+        copy.fNames = fieldNames;
+        copy.fTypes = fieldTypes;
+        return copy;
+    }
+}
diff --git 
a/flink-python/src/test/java/org/apache/flink/table/legacyutils/TestUpsertSink.java
 
b/flink-python/src/test/java/org/apache/flink/table/legacyutils/TestUpsertSink.java
new file mode 100644
index 0000000..1740d53
--- /dev/null
+++ 
b/flink-python/src/test/java/org/apache/flink/table/legacyutils/TestUpsertSink.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.legacyutils;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import org.apache.flink.table.sinks.TableSink;
+import org.apache.flink.table.sinks.UpsertStreamTableSink;
+import org.apache.flink.types.Row;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** testing upsert sink. */
+public class TestUpsertSink implements UpsertStreamTableSink<Row> {
+
+    private String[] fNames = null;
+    private TypeInformation<?>[] fTypes = null;
+
+    private final String[] expectedKeys;
+    private final boolean expectedIsAppendOnly;
+
+    public TestUpsertSink(String[] expectedKeys, boolean expectedIsAppendOnly) 
{
+        this.expectedKeys = expectedKeys;
+        this.expectedIsAppendOnly = expectedIsAppendOnly;
+    }
+
+    @Override
+    public void setKeyFields(String[] keys) {
+        assertThat(keys)
+                .as("Provided key fields do not match expected keys")
+                .containsExactlyInAnyOrder(expectedKeys);
+    }
+
+    @Override
+    public void setIsAppendOnly(Boolean isAppendOnly) {
+        assertThat(isAppendOnly)
+                .as("Provided isAppendOnly does not match expected 
isAppendOnly")
+                .isEqualTo(expectedIsAppendOnly);
+    }
+
+    @Override
+    public TypeInformation<Row> getRecordType() {
+        return new RowTypeInfo(fTypes, fNames);
+    }
+
+    @Override
+    public DataStreamSink<?> consumeDataStream(DataStream<Tuple2<Boolean, 
Row>> s) {
+        return s.addSink(new RowSink());
+    }
+
+    @Override
+    public String[] getFieldNames() {
+        return fNames;
+    }
+
+    @Override
+    public TypeInformation<?>[] getFieldTypes() {
+        return fTypes;
+    }
+
+    @Override
+    public TableSink<Tuple2<Boolean, Row>> configure(
+            String[] fieldNames, TypeInformation<?>[] fieldTypes) {
+        final TestUpsertSink copy = new TestUpsertSink(expectedKeys, 
expectedIsAppendOnly);
+        copy.fNames = fieldNames;
+        copy.fTypes = fieldTypes;
+        return copy;
+    }
+}
diff --git 
a/flink-python/src/test/scala/org/apache/flink/table/legacyutils/TestCollectionTableFactory.scala
 
b/flink-python/src/test/scala/org/apache/flink/table/legacyutils/TestCollectionTableFactory.scala
deleted file mode 100644
index 0d8e03a..0000000
--- 
a/flink-python/src/test/scala/org/apache/flink/table/legacyutils/TestCollectionTableFactory.scala
+++ /dev/null
@@ -1,253 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.legacyutils
-
-import org.apache.flink.api.common.ExecutionConfig
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.common.typeutils.TypeSerializer
-import org.apache.flink.api.java.io.CollectionInputFormat
-import org.apache.flink.api.java.typeutils.RowTypeInfo
-import org.apache.flink.configuration.Configuration
-import org.apache.flink.streaming.api.datastream.{DataStream, DataStreamSink, 
DataStreamSource}
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
-import org.apache.flink.streaming.api.functions.sink.RichSinkFunction
-import org.apache.flink.table.api.TableSchema
-import 
org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR
-import org.apache.flink.table.descriptors.{DescriptorProperties, Schema}
-import org.apache.flink.table.factories.{StreamTableSinkFactory, 
StreamTableSourceFactory}
-import org.apache.flink.table.functions.{AsyncTableFunction, TableFunction}
-import 
org.apache.flink.table.legacyutils.TestCollectionTableFactory.{getCollectionSink,
 getCollectionSource}
-import org.apache.flink.table.sinks.{AppendStreamTableSink, StreamTableSink, 
TableSink}
-import org.apache.flink.table.sources.{LookupableTableSource, 
StreamTableSource, TableSource}
-import org.apache.flink.types.Row
-
-import java.io.IOException
-import java.util
-import java.util.{Optional, ArrayList => JArrayList, LinkedList => 
JLinkedList, List => JList, Map => JMap}
-
-import scala.collection.JavaConversions._
-
-/**
- * Testing utils adopted from legacy planner until the Python code is updated.
- */
-@deprecated
-class TestCollectionTableFactory
-  extends StreamTableSourceFactory[Row]
-  with StreamTableSinkFactory[Row]
-{
-
-  override def createTableSource(properties: JMap[String, String]): 
TableSource[Row] = {
-    getCollectionSource(properties, TestCollectionTableFactory.isStreaming)
-  }
-
-  override def createTableSink(properties: JMap[String, String]): 
TableSink[Row] = {
-    getCollectionSink(properties)
-  }
-
-  override def createStreamTableSource(properties: JMap[String, String]): 
StreamTableSource[Row] = {
-    getCollectionSource(properties, isStreaming = true)
-  }
-
-  override def createStreamTableSink(properties: JMap[String, String]): 
StreamTableSink[Row] = {
-    getCollectionSink(properties)
-  }
-
-  override def requiredContext(): JMap[String, String] = {
-    val context = new util.HashMap[String, String]()
-    context.put(CONNECTOR, "COLLECTION")
-    context
-  }
-
-  override def supportedProperties(): JList[String] = {
-    val supported = new JArrayList[String]()
-    supported.add("*")
-    supported
-  }
-}
-
-@deprecated
-object TestCollectionTableFactory {
-  var isStreaming: Boolean = true
-
-  val SOURCE_DATA = new JLinkedList[Row]()
-  val DIM_DATA = new JLinkedList[Row]()
-  val RESULT = new JLinkedList[Row]()
-  private var emitIntervalMS = -1L
-
-  def initData(sourceData: JList[Row],
-      dimData: JList[Row] = List(),
-      emitInterval: Long = -1L): Unit ={
-    SOURCE_DATA.addAll(sourceData)
-    DIM_DATA.addAll(dimData)
-    emitIntervalMS = emitInterval
-  }
-
-  def reset(): Unit ={
-    RESULT.clear()
-    SOURCE_DATA.clear()
-    DIM_DATA.clear()
-    emitIntervalMS = -1L
-  }
-
-  def getCollectionSource(props: JMap[String, String],
-      isStreaming: Boolean): CollectionTableSource = {
-    val properties = new DescriptorProperties()
-    properties.putProperties(props)
-    val schema = properties.getTableSchema(Schema.SCHEMA)
-    val parallelism = properties.getOptionalInt("parallelism")
-    new CollectionTableSource(emitIntervalMS, schema, isStreaming, parallelism)
-  }
-
-  def getCollectionSink(props: JMap[String, String]): CollectionTableSink = {
-    val properties = new DescriptorProperties()
-    properties.putProperties(props)
-    val schema = properties.getTableSchema(Schema.SCHEMA)
-    new CollectionTableSink(schema.toRowType.asInstanceOf[RowTypeInfo])
-  }
-
-  /**
-    * Table source of collection.
-    */
-  class CollectionTableSource(
-      val emitIntervalMs: Long,
-      val schema: TableSchema,
-      val isStreaming: Boolean,
-      val parallelism: Optional[Integer])
-    extends StreamTableSource[Row]
-    with LookupableTableSource[Row] {
-
-    private val rowType: TypeInformation[Row] = schema.toRowType
-
-    override def isBounded: Boolean = !isStreaming
-
-    override def getDataStream(streamEnv: StreamExecutionEnvironment): 
DataStreamSource[Row] = {
-      val dataStream = streamEnv.createInput(new 
TestCollectionInputFormat[Row](emitIntervalMs,
-        SOURCE_DATA,
-        rowType.createSerializer(new ExecutionConfig)),
-        rowType)
-      if (parallelism.isPresent) {
-        dataStream.setParallelism(parallelism.get())
-      }
-      dataStream
-    }
-
-    override def getReturnType: TypeInformation[Row] = rowType
-
-    override def getTableSchema: TableSchema = {
-      schema
-    }
-
-    override def getLookupFunction(lookupKeys: Array[String]): 
TemporalTableFetcher = {
-      new TemporalTableFetcher(DIM_DATA, 
lookupKeys.map(schema.getFieldNames.indexOf(_)))
-    }
-
-    override def getAsyncLookupFunction(lookupKeys: Array[String]): 
AsyncTableFunction[Row] = null
-
-    override def isAsyncEnabled: Boolean = false
-  }
-
-  /**
-    * Table sink of collection.
-    */
-  class CollectionTableSink(val outputType: RowTypeInfo)
-      extends AppendStreamTableSink[Row] {
-
-    override def getOutputType: RowTypeInfo = outputType
-
-    override def getFieldNames: Array[String] = outputType.getFieldNames
-
-    override def getFieldTypes: Array[TypeInformation[_]] = {
-      outputType.getFieldTypes
-    }
-
-    override def consumeDataStream(dataStream: DataStream[Row]): 
DataStreamSink[_] = {
-      dataStream.addSink(new 
UnsafeMemorySinkFunction(outputType)).setParallelism(1)
-    }
-
-    override def configure(fieldNames: Array[String],
-        fieldTypes: Array[TypeInformation[_]]): TableSink[Row] = this
-  }
-
-  /**
-    * Sink function of unsafe memory.
-    */
-  class UnsafeMemorySinkFunction(outputType: TypeInformation[Row]) extends 
RichSinkFunction[Row] {
-    private var serializer: TypeSerializer[Row] = _
-
-    override def open(param: Configuration): Unit = {
-      serializer = outputType.createSerializer(new ExecutionConfig)
-    }
-
-    @throws[Exception]
-    override def invoke(row: Row): Unit = {
-      RESULT.add(serializer.copy(row))
-    }
-  }
-
-  /**
-    * Collection inputFormat for testing.
-    */
-  class TestCollectionInputFormat[T](
-      val emitIntervalMs: Long,
-      val dataSet: java.util.Collection[T],
-      val serializer: TypeSerializer[T])
-    extends CollectionInputFormat[T](dataSet, serializer) {
-    @throws[IOException]
-    override def reachedEnd: Boolean = {
-      if (emitIntervalMs > 0) {
-        try
-          Thread.sleep(emitIntervalMs)
-        catch {
-          case _: InterruptedException =>
-        }
-      }
-      super.reachedEnd
-    }
-  }
-
-  /**
-    * Dimension table source fetcher.
-    */
-  class TemporalTableFetcher(
-      val dimData: JLinkedList[Row],
-      val keys: Array[Int]) extends TableFunction[Row] {
-
-    @throws[Exception]
-    def eval(values: Any*): Unit = {
-      for (data <- dimData) {
-        var matched = true
-        var idx = 0
-        while (matched && idx < keys.length) {
-          val dimField = data.getField(keys(idx))
-          val inputField = values(idx)
-          matched = dimField.equals(inputField)
-          idx += 1
-        }
-        if (matched) {
-          // copy the row data
-          val ret = new Row(data.getArity)
-          0 until data.getArity foreach { idx =>
-            ret.setField(idx, data.getField(idx))
-          }
-          collect(ret)
-        }
-      }
-    }
-  }
-}
diff --git 
a/flink-python/src/test/scala/org/apache/flink/table/legacyutils/legacyTestingDescriptors.scala
 
b/flink-python/src/test/scala/org/apache/flink/table/legacyutils/legacyTestingDescriptors.scala
deleted file mode 100644
index f08d214..0000000
--- 
a/flink-python/src/test/scala/org/apache/flink/table/legacyutils/legacyTestingDescriptors.scala
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.legacyutils
-
-import org.apache.flink.api.common.typeinfo.{TypeInformation, Types}
-import org.apache.flink.streaming.api.watermark.Watermark
-import org.apache.flink.table.api.{DataTypes, ValidationException}
-import org.apache.flink.table.expressions.{ApiExpressionUtils, Expression, 
FieldReferenceExpression, ResolvedFieldReference}
-import org.apache.flink.table.functions.BuiltInFunctionDefinitions
-import org.apache.flink.table.sources.tsextractors.TimestampExtractor
-import org.apache.flink.table.sources.wmstrategies.PunctuatedWatermarkAssigner
-import org.apache.flink.table.types.utils.TypeConversions
-import org.apache.flink.types.Row
-
-/*
- * Testing utils adopted from legacy planner until the Python code is updated.
- */
-
-@deprecated
-class CustomAssigner extends PunctuatedWatermarkAssigner() {
-  override def getWatermark(row: Row, timestamp: Long): Watermark =
-    throw new UnsupportedOperationException()
-}
-
-@deprecated
-class CustomExtractor(val field: String) extends TimestampExtractor {
-  def this() = {
-    this("ts")
-  }
-
-  override def getArgumentFields: Array[String] = Array(field)
-
-  override def validateArgumentFields(argumentFieldTypes: 
Array[TypeInformation[_]]): Unit = {
-    argumentFieldTypes(0) match {
-      case Types.SQL_TIMESTAMP =>
-      case _ =>
-        throw new ValidationException(
-          s"Field 'ts' must be of type Timestamp but is of type 
${argumentFieldTypes(0)}.")
-    }
-  }
-
-  override def getExpression(fieldAccesses: Array[ResolvedFieldReference]): 
Expression = {
-    val fieldAccess = fieldAccesses(0)
-    require(fieldAccess.resultType == Types.SQL_TIMESTAMP)
-    val fieldReferenceExpr = new FieldReferenceExpression(
-      fieldAccess.name,
-      TypeConversions.fromLegacyInfoToDataType(fieldAccess.resultType),
-      0,
-      fieldAccess.fieldIndex)
-    ApiExpressionUtils.unresolvedCall(
-      BuiltInFunctionDefinitions.CAST,
-      fieldReferenceExpr,
-      ApiExpressionUtils.typeLiteral(DataTypes.BIGINT()))
-  }
-
-  override def equals(other: Any): Boolean = other match {
-    case that: CustomExtractor => field == that.field
-    case _ => false
-  }
-
-  override def hashCode(): Int = {
-    field.hashCode
-  }
-}
diff --git 
a/flink-python/src/test/scala/org/apache/flink/table/legacyutils/legacyTestingFunctions.scala
 
b/flink-python/src/test/scala/org/apache/flink/table/legacyutils/legacyTestingFunctions.scala
deleted file mode 100644
index 9c032ca..0000000
--- 
a/flink-python/src/test/scala/org/apache/flink/table/legacyutils/legacyTestingFunctions.scala
+++ /dev/null
@@ -1,152 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.legacyutils
-
-import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
-import org.apache.flink.api.java.typeutils.TupleTypeInfo
-import org.apache.flink.table.functions.{AggregateFunction, FunctionContext, 
ScalarFunction, TableFunction}
-import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
-
-import java.lang.{Iterable => JIterable}
-import org.junit.Assert
-
-/*
- * Testing utils adopted from legacy planner until the Python code is updated.
- */
-
-@deprecated
-class RichFunc0 extends ScalarFunction {
-  var openCalled = false
-  var closeCalled = false
-
-  override def open(context: FunctionContext): Unit = {
-    super.open(context)
-    if (openCalled) {
-      Assert.fail("Open called more than once.")
-    } else {
-      openCalled = true
-    }
-    if (closeCalled) {
-      Assert.fail("Close called before open.")
-    }
-  }
-
-  def eval(index: Int): Int = {
-    if (!openCalled) {
-      Assert.fail("Open was not called before eval.")
-    }
-    if (closeCalled) {
-      Assert.fail("Close called before eval.")
-    }
-
-    index + 1
-  }
-
-  override def close(): Unit = {
-    super.close()
-    if (closeCalled) {
-      Assert.fail("Close called more than once.")
-    } else {
-      closeCalled = true
-    }
-    if (!openCalled) {
-      Assert.fail("Open was not called before close.")
-    }
-  }
-}
-
-@deprecated
-class MaxAccumulator[T] extends JTuple2[T, Boolean]
-
-@deprecated
-abstract class MaxAggFunction[T](implicit ord: Ordering[T])
-  extends AggregateFunction[T, MaxAccumulator[T]] {
-
-  override def createAccumulator(): MaxAccumulator[T] = {
-    val acc = new MaxAccumulator[T]
-    acc.f0 = getInitValue
-    acc.f1 = false
-    acc
-  }
-
-  def accumulate(acc: MaxAccumulator[T], value: Any): Unit = {
-    if (value != null) {
-      val v = value.asInstanceOf[T]
-      if (!acc.f1 || ord.compare(acc.f0, v) < 0) {
-        acc.f0 = v
-        acc.f1 = true
-      }
-    }
-  }
-
-  override def getValue(acc: MaxAccumulator[T]): T = {
-    if (acc.f1) {
-      acc.f0
-    } else {
-      null.asInstanceOf[T]
-    }
-  }
-
-  def merge(acc: MaxAccumulator[T], its: JIterable[MaxAccumulator[T]]): Unit = 
{
-    val iter = its.iterator()
-    while (iter.hasNext) {
-      val a = iter.next()
-      if (a.f1) {
-        accumulate(acc, a.f0)
-      }
-    }
-  }
-
-  def resetAccumulator(acc: MaxAccumulator[T]): Unit = {
-    acc.f0 = getInitValue
-    acc.f1 = false
-  }
-
-  override def getAccumulatorType: TypeInformation[MaxAccumulator[T]] = {
-    new TupleTypeInfo(
-      classOf[MaxAccumulator[T]],
-      getValueTypeInfo,
-      BasicTypeInfo.BOOLEAN_TYPE_INFO)
-  }
-
-  def getInitValue: T
-
-  def getValueTypeInfo: TypeInformation[_]
-}
-
-@deprecated
-class ByteMaxAggFunction extends MaxAggFunction[Byte] {
-  override def getInitValue: Byte = 0.toByte
-  override def getValueTypeInfo = BasicTypeInfo.BYTE_TYPE_INFO
-}
-
-@deprecated
-class TableFunc1 extends TableFunction[String] {
-  def eval(str: String): Unit = {
-    if (str.contains("#")){
-      str.split("#").foreach(collect)
-    }
-  }
-
-  def eval(str: String, prefix: String): Unit = {
-    if (str.contains("#")) {
-      str.split("#").foreach(s => collect(prefix + s))
-    }
-  }
-}
diff --git 
a/flink-python/src/test/scala/org/apache/flink/table/legacyutils/legacyTestingSinks.scala
 
b/flink-python/src/test/scala/org/apache/flink/table/legacyutils/legacyTestingSinks.scala
deleted file mode 100644
index 871f6a1..0000000
--- 
a/flink-python/src/test/scala/org/apache/flink/table/legacyutils/legacyTestingSinks.scala
+++ /dev/null
@@ -1,200 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.legacyutils
-
-import org.apache.flink.api.common.functions.MapFunction
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
-import org.apache.flink.api.java.typeutils.RowTypeInfo
-import org.apache.flink.streaming.api.datastream.{DataStream, DataStreamSink}
-import org.apache.flink.streaming.api.functions.sink.SinkFunction
-import org.apache.flink.table.sinks.{AppendStreamTableSink, 
RetractStreamTableSink, TableSink, UpsertStreamTableSink}
-import org.apache.flink.types.Row
-
-import java.lang.{Boolean => JBool}
-
-import scala.collection.mutable
-
-/*
- * Testing utils adopted from legacy planner until the Python code is updated.
- */
-
-@deprecated
-private[flink] class TestAppendSink extends AppendStreamTableSink[Row] {
-
-  var fNames: Array[String] = _
-  var fTypes: Array[TypeInformation[_]] = _
-
-  override def consumeDataStream(dataStream: DataStream[Row]): 
DataStreamSink[_] = {
-    dataStream.map(
-      new MapFunction[Row, JTuple2[JBool, Row]] {
-        override def map(value: Row): JTuple2[JBool, Row] = new JTuple2(true, 
value)
-      })
-      .addSink(new RowSink)
-  }
-
-  override def getOutputType: TypeInformation[Row] = new RowTypeInfo(fTypes, 
fNames)
-
-  override def getFieldNames: Array[String] = fNames
-
-  override def getFieldTypes: Array[TypeInformation[_]] = fTypes
-
-  override def configure(
-    fieldNames: Array[String],
-    fieldTypes: Array[TypeInformation[_]]): TableSink[Row] = {
-    val copy = new TestAppendSink
-    copy.fNames = fieldNames
-    copy.fTypes = fieldTypes
-    copy
-  }
-}
-
-@deprecated
-private[flink] class TestRetractSink extends RetractStreamTableSink[Row] {
-
-  var fNames: Array[String] = _
-  var fTypes: Array[TypeInformation[_]] = _
-
-  override def consumeDataStream(s: DataStream[JTuple2[JBool, Row]]): 
DataStreamSink[_] = {
-    s.addSink(new RowSink)
-  }
-
-  override def getRecordType: TypeInformation[Row] = new RowTypeInfo(fTypes, 
fNames)
-
-  override def getFieldNames: Array[String] = fNames
-
-  override def getFieldTypes: Array[TypeInformation[_]] = fTypes
-
-  override def configure(
-      fieldNames: Array[String],
-      fieldTypes: Array[TypeInformation[_]]): TableSink[JTuple2[JBool, Row]] = 
{
-    val copy = new TestRetractSink
-    copy.fNames = fieldNames
-    copy.fTypes = fieldTypes
-    copy
-  }
-}
-
-@deprecated
-private[flink] class TestUpsertSink(
-    expectedKeys: Array[String],
-    expectedIsAppendOnly: Boolean)
-  extends UpsertStreamTableSink[Row] {
-
-  var fNames: Array[String] = _
-  var fTypes: Array[TypeInformation[_]] = _
-
-  override def setKeyFields(keys: Array[String]): Unit =
-    if (keys != null) {
-      if 
(!expectedKeys.sorted.mkString(",").equals(keys.sorted.mkString(","))) {
-        throw new AssertionError("Provided key fields do not match expected 
keys")
-      }
-    } else {
-      if (expectedKeys != null) {
-        throw new AssertionError("Provided key fields should not be null.")
-      }
-    }
-
-  override def setIsAppendOnly(isAppendOnly: JBool): Unit =
-    if (expectedIsAppendOnly != isAppendOnly) {
-      throw new AssertionError("Provided isAppendOnly does not match expected 
isAppendOnly")
-    }
-
-  override def getRecordType: TypeInformation[Row] = new RowTypeInfo(fTypes, 
fNames)
-
-  override def consumeDataStream(s: DataStream[JTuple2[JBool, Row]]): 
DataStreamSink[_] = {
-    s.addSink(new RowSink)
-  }
-
-  override def getFieldNames: Array[String] = fNames
-
-  override def getFieldTypes: Array[TypeInformation[_]] = fTypes
-
-  override def configure(
-      fieldNames: Array[String],
-      fieldTypes: Array[TypeInformation[_]]): TableSink[JTuple2[JBool, Row]] = 
{
-    val copy = new TestUpsertSink(expectedKeys, expectedIsAppendOnly)
-    copy.fNames = fieldNames
-    copy.fTypes = fieldTypes
-    copy
-  }
-}
-
-@deprecated
-class RowSink extends SinkFunction[JTuple2[JBool, Row]] {
-  override def invoke(value: JTuple2[JBool, Row]): Unit = 
RowCollector.addValue(value)
-}
-
-@deprecated
-object RowCollector {
-  private val sink: mutable.ArrayBuffer[JTuple2[JBool, Row]] =
-    new mutable.ArrayBuffer[JTuple2[JBool, Row]]()
-
-  def addValue(value: JTuple2[JBool, Row]): Unit = {
-
-    // make a copy
-    val copy = new JTuple2[JBool, Row](value.f0, Row.copy(value.f1))
-    sink.synchronized {
-      sink += copy
-    }
-  }
-
-  def getAndClearValues: List[JTuple2[JBool, Row]] = {
-    val out = sink.toList
-    sink.clear()
-    out
-  }
-
-  /** Converts a list of retraction messages into a list of final results. */
-  def retractResults(results: List[JTuple2[JBool, Row]]): List[String] = {
-
-    val retracted = results
-      .foldLeft(Map[String, Int]()){ (m: Map[String, Int], v: JTuple2[JBool, 
Row]) =>
-        val cnt = m.getOrElse(v.f1.toString, 0)
-        if (v.f0) {
-          m + (v.f1.toString -> (cnt + 1))
-        } else {
-          m + (v.f1.toString -> (cnt - 1))
-        }
-      }.filter{ case (_, c: Int) => c != 0 }
-
-    if (retracted.exists{ case (_, c: Int) => c < 0}) {
-      throw new AssertionError("Received retracted rows which have not been 
accumulated.")
-    }
-
-    retracted.flatMap { case (r: String, c: Int) => (0 until c).map(_ => r) 
}.toList
-  }
-
-  /** Converts a list of upsert messages into a list of final results. */
-  def upsertResults(results: List[JTuple2[JBool, Row]], keys: Array[Int]): 
List[String] = {
-
-    def getKeys(r: Row): Row = Row.project(r, keys)
-
-    val upserted = results.foldLeft(Map[Row, String]()){ (o: Map[Row, String], 
r) =>
-      val key = getKeys(r.f1)
-      if (r.f0) {
-        o + (key -> r.f1.toString)
-      } else {
-        o - key
-      }
-    }
-
-    upserted.values.toList
-  }
-}

Reply via email to