This is an automated email from the ASF dual-hosted git repository. chesnay pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 45ba3035d00f890540967693d9fd9e43b8b5e0e4 Author: Chesnay Schepler <[email protected]> AuthorDate: Mon Mar 21 19:37:37 2022 +0100 [FLINK-26842][python][tests] Port Scala code to Java --- flink-python/pom.xml | 18 -- .../pyflink/table/tests/test_descriptor.py | 4 +- flink-python/pyflink/testing/test_case_utils.py | 4 +- .../table/legacyutils/ByteMaxAggFunction.java | 76 +++++ .../flink/table/legacyutils/CustomAssigner.java | 32 +++ .../flink/table/legacyutils/CustomExtractor.java | 69 +++++ .../flink/table/legacyutils/MaxAccumulator.java | 25 ++ .../apache/flink/table/legacyutils/RichFunc0.java | 69 +++++ .../flink/table/legacyutils/RowCollector.java | 79 ++++++ .../apache/flink/table/legacyutils/RowSink.java | 32 +++ .../apache/flink/table/legacyutils/TableFunc1.java | 42 +++ .../flink/table/legacyutils/TestAppendSink.java | 73 +++++ .../legacyutils/TestCollectionTableFactory.java | 307 +++++++++++++++++++++ .../flink/table/legacyutils/TestRetractSink.java | 63 +++++ .../flink/table/legacyutils/TestUpsertSink.java | 87 ++++++ .../legacyutils/TestCollectionTableFactory.scala | 253 ----------------- .../legacyutils/legacyTestingDescriptors.scala | 80 ------ .../table/legacyutils/legacyTestingFunctions.scala | 152 ---------- .../table/legacyutils/legacyTestingSinks.scala | 200 -------------- 19 files changed, 958 insertions(+), 707 deletions(-) diff --git a/flink-python/pom.xml b/flink-python/pom.xml index 4204436..46371a4 100644 --- a/flink-python/pom.xml +++ b/flink-python/pom.xml @@ -470,24 +470,6 @@ under the License. </execution> </executions> </plugin> - <!-- This is only a temporary solution until FLINK-22872 is fixed. --> - <!-- It compiles `org.apache.flink.table.legacyutils` containing code from the old planner. --> - <!-- We should not start adding more Scala code. Please remove this as soon as possible. --> - <plugin> - <groupId>net.alchim31.maven</groupId> - <artifactId>scala-maven-plugin</artifactId> - <executions> - <!-- Run Scala compiler in the process-test-resources phase, so that dependencies on - Scala classes can be resolved later in the (Java) test-compile phase --> - <execution> - <id>scala-test-compile</id> - <phase>process-test-resources</phase> - <goals> - <goal>testCompile</goal> - </goals> - </execution> - </executions> - </plugin> </plugins> </build> </project> diff --git a/flink-python/pyflink/table/tests/test_descriptor.py b/flink-python/pyflink/table/tests/test_descriptor.py index c973d9c..e63429b 100644 --- a/flink-python/pyflink/table/tests/test_descriptor.py +++ b/flink-python/pyflink/table/tests/test_descriptor.py @@ -50,8 +50,8 @@ class RowTimeDescriptorTests(PyFlinkTestCase): 'rowtime.timestamps.class': 'org.apache.flink.table.legacyutils.CustomExtractor', 'rowtime.timestamps.serialized': - 'rO0ABXNyADJvcmcuYXBhY2hlLmZsaW5rLnRhYmxlLmxlZ2FjeXV0aWxzLkN1c3RvbUV4dHJhY3Rvctj' - 'ZLTGK9XvxAgABTAAFZmllbGR0ABJMamF2YS9sYW5nL1N0cmluZzt4cgA-b3JnLmFwYWNoZS5mbGluay' + 'rO0ABXNyADJvcmcuYXBhY2hlLmZsaW5rLnRhYmxlLmxlZ2FjeXV0aWxzLkN1c3RvbUV4dHJhY3Rvcl4' + 'ozwVLIwG6AgABTAAFZmllbGR0ABJMamF2YS9sYW5nL1N0cmluZzt4cgA-b3JnLmFwYWNoZS5mbGluay' '50YWJsZS5zb3VyY2VzLnRzZXh0cmFjdG9ycy5UaW1lc3RhbXBFeHRyYWN0b3Jf1Y6piFNsGAIAAHhwd' 'AACdHM'} self.assertEqual(expected, properties) diff --git a/flink-python/pyflink/testing/test_case_utils.py b/flink-python/pyflink/testing/test_case_utils.py index c8749648..8abe31f 100644 --- a/flink-python/pyflink/testing/test_case_utils.py +++ b/flink-python/pyflink/testing/test_case_utils.py @@ -107,8 +107,8 @@ class PyFlinkTestCase(unittest.TestCase): @classmethod def to_py_list(cls, actual): py_list = [] - for i in range(0, actual.length()): - py_list.append(actual.apply(i)) + for i in range(0, actual.size()): + py_list.append(actual.get(i)) return py_list @classmethod diff --git a/flink-python/src/test/java/org/apache/flink/table/legacyutils/ByteMaxAggFunction.java b/flink-python/src/test/java/org/apache/flink/table/legacyutils/ByteMaxAggFunction.java new file mode 100644 index 0000000..ad9ebb4 --- /dev/null +++ b/flink-python/src/test/java/org/apache/flink/table/legacyutils/ByteMaxAggFunction.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.legacyutils; + +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; +import org.apache.flink.table.functions.AggregateFunction; + +/** {@link AggregateFunction} for {@link Byte}. */ +public class ByteMaxAggFunction extends AggregateFunction<Byte, MaxAccumulator<Byte>> { + + private static final long serialVersionUID = 1233840393767061909L; + + @Override + public MaxAccumulator<Byte> createAccumulator() { + final MaxAccumulator<Byte> acc = new MaxAccumulator<>(); + resetAccumulator(acc); + return acc; + } + + public void accumulate(MaxAccumulator<Byte> acc, Byte value) { + if (value != null) { + if (!acc.f1 || Byte.compare(acc.f0, value) < 0) { + acc.f0 = value; + acc.f1 = true; + } + } + } + + @Override + public Byte getValue(MaxAccumulator<Byte> acc) { + if (acc.f1) { + return acc.f0; + } else { + return null; + } + } + + public void merge(MaxAccumulator<Byte> acc, Iterable<MaxAccumulator<Byte>> its) { + its.forEach( + a -> { + if (a.f1) { + accumulate(acc, a.f0); + } + }); + } + + public void resetAccumulator(MaxAccumulator<Byte> acc) { + acc.f0 = 0; + acc.f1 = false; + } + + @Override + public TypeInformation<MaxAccumulator<Byte>> getAccumulatorType() { + return new TupleTypeInfo( + MaxAccumulator.class, + BasicTypeInfo.BYTE_TYPE_INFO, + BasicTypeInfo.BOOLEAN_TYPE_INFO); + } +} diff --git a/flink-python/src/test/java/org/apache/flink/table/legacyutils/CustomAssigner.java b/flink-python/src/test/java/org/apache/flink/table/legacyutils/CustomAssigner.java new file mode 100644 index 0000000..d69cce5 --- /dev/null +++ b/flink-python/src/test/java/org/apache/flink/table/legacyutils/CustomAssigner.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.legacyutils; + +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.table.sources.wmstrategies.PunctuatedWatermarkAssigner; +import org.apache.flink.types.Row; + +/** A watermark assigner that throws an exception if a watermark is requested. */ +public class CustomAssigner extends PunctuatedWatermarkAssigner { + private static final long serialVersionUID = -4900176786361416000L; + + @Override + public Watermark getWatermark(Row row, long timestamp) { + throw new UnsupportedOperationException(); + } +} diff --git a/flink-python/src/test/java/org/apache/flink/table/legacyutils/CustomExtractor.java b/flink-python/src/test/java/org/apache/flink/table/legacyutils/CustomExtractor.java new file mode 100644 index 0000000..cf66733 --- /dev/null +++ b/flink-python/src/test/java/org/apache/flink/table/legacyutils/CustomExtractor.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.legacyutils; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeinfo.Types; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.expressions.ApiExpressionUtils; +import org.apache.flink.table.expressions.Expression; +import org.apache.flink.table.expressions.FieldReferenceExpression; +import org.apache.flink.table.expressions.ResolvedFieldReference; +import org.apache.flink.table.functions.BuiltInFunctionDefinitions; +import org.apache.flink.table.sources.tsextractors.TimestampExtractor; +import org.apache.flink.table.types.utils.TypeConversions; +import org.apache.flink.util.Preconditions; + +/** A timestamp extractor that looks for the SQL_TIMESTAMP "ts" field. */ +public class CustomExtractor extends TimestampExtractor { + private static final long serialVersionUID = 6784900460276023738L; + + private final String field = "ts"; + + @Override + public String[] getArgumentFields() { + return new String[] {field}; + } + + @Override + public void validateArgumentFields(TypeInformation<?>[] argumentFieldTypes) { + if (argumentFieldTypes[0] != Types.SQL_TIMESTAMP) { + throw new ValidationException( + String.format( + "Field 'ts' must be of type Timestamp but is of type %s.", + argumentFieldTypes[0])); + } + } + + @Override + public Expression getExpression(ResolvedFieldReference[] fieldAccesses) { + ResolvedFieldReference fieldAccess = fieldAccesses[0]; + Preconditions.checkArgument(fieldAccess.resultType() == Types.SQL_TIMESTAMP); + FieldReferenceExpression fieldReferenceExpr = + new FieldReferenceExpression( + fieldAccess.name(), + TypeConversions.fromLegacyInfoToDataType(fieldAccess.resultType()), + 0, + fieldAccess.fieldIndex()); + return ApiExpressionUtils.unresolvedCall( + BuiltInFunctionDefinitions.CAST, + fieldReferenceExpr, + ApiExpressionUtils.typeLiteral(DataTypes.BIGINT())); + } +} diff --git a/flink-python/src/test/java/org/apache/flink/table/legacyutils/MaxAccumulator.java b/flink-python/src/test/java/org/apache/flink/table/legacyutils/MaxAccumulator.java new file mode 100644 index 0000000..191aa87 --- /dev/null +++ b/flink-python/src/test/java/org/apache/flink/table/legacyutils/MaxAccumulator.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.legacyutils; + +import org.apache.flink.api.java.tuple.Tuple2; + +/** Utility class to make working with tuples more readable. */ +public class MaxAccumulator<T> extends Tuple2<T, Boolean> { + private static final long serialVersionUID = 6089142148200600733L; +} diff --git a/flink-python/src/test/java/org/apache/flink/table/legacyutils/RichFunc0.java b/flink-python/src/test/java/org/apache/flink/table/legacyutils/RichFunc0.java new file mode 100644 index 0000000..a757fe9 --- /dev/null +++ b/flink-python/src/test/java/org/apache/flink/table/legacyutils/RichFunc0.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.legacyutils; + +import org.apache.flink.table.functions.FunctionContext; +import org.apache.flink.table.functions.ScalarFunction; + +import org.junit.Assert; + +/** + * Testing scalar function to verify that lifecycle methods are called in the expected order and + * only once. + */ +public class RichFunc0 extends ScalarFunction { + private static final long serialVersionUID = 931156471687322386L; + + private boolean openCalled = false; + private boolean closeCalled = false; + + @Override + public void open(FunctionContext context) throws Exception { + super.open(context); + if (openCalled) { + Assert.fail("Open called more than once."); + } else { + openCalled = true; + } + if (closeCalled) { + Assert.fail("Close called before open."); + } + } + + public void eval(int index) { + if (!openCalled) { + Assert.fail("Open was not called before eval."); + } + if (closeCalled) { + Assert.fail("Close called before eval."); + } + } + + @Override + public void close() throws Exception { + super.close(); + if (closeCalled) { + Assert.fail("Close called more than once."); + } else { + closeCalled = true; + } + if (!openCalled) { + Assert.fail("Open was not called before close."); + } + } +} diff --git a/flink-python/src/test/java/org/apache/flink/table/legacyutils/RowCollector.java b/flink-python/src/test/java/org/apache/flink/table/legacyutils/RowCollector.java new file mode 100644 index 0000000..9fed516 --- /dev/null +++ b/flink-python/src/test/java/org/apache/flink/table/legacyutils/RowCollector.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.legacyutils; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.types.Row; + +import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +/** A collector for storing results in memory. */ +class RowCollector { + private static final ArrayDeque<Tuple2<Boolean, Row>> sink = new ArrayDeque<>(); + + public static void addValue(Tuple2<Boolean, Row> value) { + synchronized (sink) { + sink.add(value.copy()); + } + } + + public static List<Tuple2<Boolean, Row>> getAndClearValues() { + final ArrayList<Tuple2<Boolean, Row>> out = new ArrayList<>(sink); + sink.clear(); + return out; + } + + public static List<String> retractResults(List<Tuple2<Boolean, Row>> results) { + final Map<String, Integer> retracted = + results.stream() + .collect( + Collectors.groupingBy( + r -> r.f1.toString(), + Collectors.mapping( + r -> r.f0 ? 1 : -1, + Collectors.reducing( + 0, (left, right) -> left + right)))); + + if (retracted.values().stream().anyMatch(c -> c < 0)) { + throw new AssertionError("Received retracted rows which have not been accumulated."); + } + + return retracted.entrySet().stream() + .flatMap(e -> IntStream.range(0, e.getValue()).mapToObj(i -> e.getKey())) + .collect(Collectors.toList()); + } + + public static List<String> upsertResults(List<Tuple2<Boolean, Row>> results, int[] keys) { + final HashMap<Row, String> upserted = new HashMap<>(); + for (Tuple2<Boolean, Row> r : results) { + final Row key = Row.project(r.f1, keys); + if (r.f0) { + upserted.put(key, r.f1.toString()); + } else { + upserted.remove(key); + } + } + return new ArrayList<>(upserted.values()); + } +} diff --git a/flink-python/src/test/java/org/apache/flink/table/legacyutils/RowSink.java b/flink-python/src/test/java/org/apache/flink/table/legacyutils/RowSink.java new file mode 100644 index 0000000..0051d9c --- /dev/null +++ b/flink-python/src/test/java/org/apache/flink/table/legacyutils/RowSink.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.legacyutils; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.streaming.api.functions.sink.SinkFunction; +import org.apache.flink.types.Row; + +/** A sink that stores data in the {@link RowCollector}. */ +public class RowSink implements SinkFunction<Tuple2<Boolean, Row>> { + private static final long serialVersionUID = -7264802354440479084L; + + @Override + public void invoke(Tuple2<Boolean, Row> value, Context context) throws Exception { + RowCollector.addValue(value); + } +} diff --git a/flink-python/src/test/java/org/apache/flink/table/legacyutils/TableFunc1.java b/flink-python/src/test/java/org/apache/flink/table/legacyutils/TableFunc1.java new file mode 100644 index 0000000..4e34e65 --- /dev/null +++ b/flink-python/src/test/java/org/apache/flink/table/legacyutils/TableFunc1.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.legacyutils; + +import org.apache.flink.table.functions.TableFunction; + +/** A function that splits strings, optionally adding a prefix. */ +public class TableFunc1 extends TableFunction<String> { + + private static final long serialVersionUID = -5471603822898040617L; + + public void eval(String str) { + if (str.contains("#")) { + for (String s : str.split("#")) { + collect(s); + } + } + } + + public void eval(String str, String prefix) { + if (str.contains("#")) { + for (String s : str.split("#")) { + collect(prefix + s); + } + } + } +} diff --git a/flink-python/src/test/java/org/apache/flink/table/legacyutils/TestAppendSink.java b/flink-python/src/test/java/org/apache/flink/table/legacyutils/TestAppendSink.java new file mode 100644 index 0000000..cde5467 --- /dev/null +++ b/flink-python/src/test/java/org/apache/flink/table/legacyutils/TestAppendSink.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.legacyutils; + +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.DataStreamSink; +import org.apache.flink.table.sinks.AppendStreamTableSink; +import org.apache.flink.table.sinks.TableSink; +import org.apache.flink.types.Row; + +/** Testing append sink. */ +public class TestAppendSink implements AppendStreamTableSink<Row> { + + private String[] fNames = null; + private TypeInformation<?>[] fTypes = null; + + @Override + public DataStreamSink<?> consumeDataStream(DataStream<Row> dataStream) { + return dataStream + .map( + new MapFunction<Row, Tuple2<Boolean, Row>>() { + private static final long serialVersionUID = 4671583708680989488L; + + @Override + public Tuple2<Boolean, Row> map(Row value) throws Exception { + return Tuple2.of(true, value); + } + }) + .addSink(new RowSink()); + } + + @Override + public TypeInformation<Row> getOutputType() { + return new RowTypeInfo(fTypes, fNames); + } + + @Override + public String[] getFieldNames() { + return fNames; + } + + @Override + public TypeInformation<?>[] getFieldTypes() { + return fTypes; + } + + @Override + public TableSink<Row> configure(String[] fieldNames, TypeInformation<?>[] fieldTypes) { + final TestAppendSink copy = new TestAppendSink(); + copy.fNames = fieldNames; + copy.fTypes = fieldTypes; + return copy; + } +} diff --git a/flink-python/src/test/java/org/apache/flink/table/legacyutils/TestCollectionTableFactory.java b/flink-python/src/test/java/org/apache/flink/table/legacyutils/TestCollectionTableFactory.java new file mode 100644 index 0000000..dfa2db4 --- /dev/null +++ b/flink-python/src/test/java/org/apache/flink/table/legacyutils/TestCollectionTableFactory.java @@ -0,0 +1,307 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.legacyutils; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.io.CollectionInputFormat; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.DataStreamSink; +import org.apache.flink.streaming.api.datastream.DataStreamSource; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.table.descriptors.ConnectorDescriptorValidator; +import org.apache.flink.table.descriptors.DescriptorProperties; +import org.apache.flink.table.descriptors.Schema; +import org.apache.flink.table.factories.StreamTableSinkFactory; +import org.apache.flink.table.factories.StreamTableSourceFactory; +import org.apache.flink.table.functions.AsyncTableFunction; +import org.apache.flink.table.functions.TableFunction; +import org.apache.flink.table.sinks.AppendStreamTableSink; +import org.apache.flink.table.sinks.StreamTableSink; +import org.apache.flink.table.sinks.TableSink; +import org.apache.flink.table.sources.LookupableTableSource; +import org.apache.flink.table.sources.StreamTableSource; +import org.apache.flink.table.sources.TableSource; +import org.apache.flink.types.Row; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Optional; + +/** Factory for the testing sinks. */ +public class TestCollectionTableFactory + implements StreamTableSourceFactory<Row>, StreamTableSinkFactory<Row> { + + private static boolean isStreaming = true; + + private static final List<Row> SOURCE_DATA = new LinkedList<>(); + private static final List<Row> DIM_DATA = new LinkedList<>(); + private static final List<Row> RESULT = new LinkedList<>(); + + private long emitIntervalMS = -1L; + + @Override + public TableSource<Row> createTableSource(Map<String, String> properties) { + return getCollectionSource(properties, isStreaming); + } + + @Override + public TableSink<Row> createTableSink(Map<String, String> properties) { + return getCollectionSink(properties); + } + + @Override + public StreamTableSource<Row> createStreamTableSource(Map<String, String> properties) { + return getCollectionSource(properties, true); + } + + @Override + public StreamTableSink<Row> createStreamTableSink(Map<String, String> properties) { + return getCollectionSink(properties); + } + + private CollectionTableSource getCollectionSource( + Map<String, String> props, boolean isStreaming) { + final DescriptorProperties properties = new DescriptorProperties(); + properties.putProperties(props); + final TableSchema schema = properties.getTableSchema(Schema.SCHEMA); + final Optional<Integer> parallelism = properties.getOptionalInt("parallelism"); + return new CollectionTableSource(emitIntervalMS, schema, isStreaming, parallelism); + } + + private CollectionTableSink getCollectionSink(Map<String, String> props) { + final DescriptorProperties properties = new DescriptorProperties(); + properties.putProperties(props); + final TableSchema schema = properties.getTableSchema(Schema.SCHEMA); + return new CollectionTableSink((RowTypeInfo) schema.toRowType()); + } + + @Override + public Map<String, String> requiredContext() { + final HashMap<String, String> context = new HashMap<>(); + context.put(ConnectorDescriptorValidator.CONNECTOR, "COLLECTION"); + return context; + } + + @Override + public List<String> supportedProperties() { + return Arrays.asList("*"); + } + + private static class CollectionTableSource + implements StreamTableSource<Row>, LookupableTableSource<Row> { + private final long emitIntervalMs; + private final TableSchema schema; + private final boolean isStreaming; + private final Optional<Integer> parallelism; + private final TypeInformation<Row> rowType; + + private CollectionTableSource( + long emitIntervalMs, + TableSchema schema, + boolean isStreaming, + Optional<Integer> parallelism) { + this.emitIntervalMs = emitIntervalMs; + this.schema = schema; + this.isStreaming = isStreaming; + this.parallelism = parallelism; + this.rowType = schema.toRowType(); + } + + @Override + public boolean isBounded() { + return !isStreaming; + } + + @Override + public DataStream<Row> getDataStream(StreamExecutionEnvironment streamEnv) { + final DataStreamSource<Row> dataStream = + streamEnv.createInput( + new TestCollectionInputFormat<>( + emitIntervalMs, + SOURCE_DATA, + rowType.createSerializer(new ExecutionConfig())), + rowType); + if (parallelism.isPresent()) { + dataStream.setParallelism(parallelism.get()); + } + return dataStream; + } + + @Override + public TypeInformation<Row> getReturnType() { + return rowType; + } + + @Override + public TableSchema getTableSchema() { + return schema; + } + + @Override + public TableFunction<Row> getLookupFunction(String[] lookupKeys) { + final String[] schemaFieldNames = schema.getFieldNames(); + final int[] keys = + Arrays.stream(lookupKeys) + .map( + k -> { + for (int x = 0; x < schemaFieldNames.length; x++) { + if (k.equals(schemaFieldNames[x])) { + return x; + } + } + throw new IllegalStateException(); + }) + .mapToInt(i -> i) + .toArray(); + + return new TemporalTableFetcher(DIM_DATA, keys); + } + + @Override + public AsyncTableFunction<Row> getAsyncLookupFunction(String[] lookupKeys) { + return null; + } + + @Override + public boolean isAsyncEnabled() { + return false; + } + } + + private static class CollectionTableSink implements AppendStreamTableSink<Row> { + private final RowTypeInfo outputType; + + private CollectionTableSink(RowTypeInfo outputType) { + this.outputType = outputType; + } + + @Override + public RowTypeInfo getOutputType() { + return outputType; + } + + @Override + public String[] getFieldNames() { + return outputType.getFieldNames(); + } + + @Override + public TypeInformation<?>[] getFieldTypes() { + return outputType.getFieldTypes(); + } + + @Override + public DataStreamSink<?> consumeDataStream(DataStream<Row> dataStream) { + return dataStream.addSink(new UnsafeMemorySinkFunction(outputType)).setParallelism(1); + } + + @Override + public TableSink<Row> configure(String[] fieldNames, TypeInformation<?>[] fieldTypes) { + return this; + } + } + + private static class UnsafeMemorySinkFunction extends RichSinkFunction<Row> { + private static final long serialVersionUID = -7880686562734099699L; + + private final TypeInformation<Row> outputType; + private TypeSerializer<Row> serializer = null; + + private UnsafeMemorySinkFunction(TypeInformation<Row> outputType) { + this.outputType = outputType; + } + + @Override + public void open(Configuration parameters) throws Exception { + serializer = outputType.createSerializer(new ExecutionConfig()); + } + + @Override + public void invoke(Row row, Context context) throws Exception { + RESULT.add(serializer.copy(row)); + } + } + + private static class TestCollectionInputFormat<T> extends CollectionInputFormat<T> { + + private static final long serialVersionUID = -3222731547793350189L; + + private final long emitIntervalMs; + + public TestCollectionInputFormat( + long emitIntervalMs, Collection<T> dataSet, TypeSerializer<T> serializer) { + super(dataSet, serializer); + this.emitIntervalMs = emitIntervalMs; + } + + @Override + public boolean reachedEnd() throws IOException { + if (emitIntervalMs > 0) { + try { + Thread.sleep(emitIntervalMs); + } catch (InterruptedException e) { + } + } + return super.reachedEnd(); + } + } + + private static class TemporalTableFetcher extends TableFunction<Row> { + private static final long serialVersionUID = 6248306950388784015L; + + private final List<Row> dimData; + private final int[] keys; + + private TemporalTableFetcher(List<Row> dimData, int[] keys) { + this.dimData = dimData; + this.keys = keys; + } + + public void eval(Row values) { + for (Row data : dimData) { + boolean matched = true; + int idx = 0; + while (matched && idx < keys.length) { + final Object dimField = data.getField(keys[idx]); + final Object inputField = values.getField(idx); + matched = dimField.equals(inputField); + idx += 1; + } + if (matched) { + // copy the row data + final Row ret = new Row(data.getArity()); + for (int x = 0; x < data.getArity(); x++) { + ret.setField(x, data.getField(x)); + } + collect(ret); + } + } + } + } +} diff --git a/flink-python/src/test/java/org/apache/flink/table/legacyutils/TestRetractSink.java b/flink-python/src/test/java/org/apache/flink/table/legacyutils/TestRetractSink.java new file mode 100644 index 0000000..449aea7 --- /dev/null +++ b/flink-python/src/test/java/org/apache/flink/table/legacyutils/TestRetractSink.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.legacyutils; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.DataStreamSink; +import org.apache.flink.table.sinks.RetractStreamTableSink; +import org.apache.flink.table.sinks.TableSink; +import org.apache.flink.types.Row; + +/** Testing retract sink. */ +public class TestRetractSink implements RetractStreamTableSink<Row> { + + private String[] fNames = null; + private TypeInformation<?>[] fTypes = null; + + @Override + public DataStreamSink<?> consumeDataStream(DataStream<Tuple2<Boolean, Row>> dataStream) { + return dataStream.addSink(new RowSink()); + } + + @Override + public TypeInformation<Row> getRecordType() { + return new RowTypeInfo(fTypes, fNames); + } + + @Override + public String[] getFieldNames() { + return fNames; + } + + @Override + public TypeInformation<?>[] getFieldTypes() { + return fTypes; + } + + @Override + public TableSink<Tuple2<Boolean, Row>> configure( + String[] fieldNames, TypeInformation<?>[] fieldTypes) { + final TestRetractSink copy = new TestRetractSink(); + copy.fNames = fieldNames; + copy.fTypes = fieldTypes; + return copy; + } +} diff --git a/flink-python/src/test/java/org/apache/flink/table/legacyutils/TestUpsertSink.java b/flink-python/src/test/java/org/apache/flink/table/legacyutils/TestUpsertSink.java new file mode 100644 index 0000000..1740d53 --- /dev/null +++ b/flink-python/src/test/java/org/apache/flink/table/legacyutils/TestUpsertSink.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.legacyutils; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.DataStreamSink; +import org.apache.flink.table.sinks.TableSink; +import org.apache.flink.table.sinks.UpsertStreamTableSink; +import org.apache.flink.types.Row; + +import static org.assertj.core.api.Assertions.assertThat; + +/** testing upsert sink. */ +public class TestUpsertSink implements UpsertStreamTableSink<Row> { + + private String[] fNames = null; + private TypeInformation<?>[] fTypes = null; + + private final String[] expectedKeys; + private final boolean expectedIsAppendOnly; + + public TestUpsertSink(String[] expectedKeys, boolean expectedIsAppendOnly) { + this.expectedKeys = expectedKeys; + this.expectedIsAppendOnly = expectedIsAppendOnly; + } + + @Override + public void setKeyFields(String[] keys) { + assertThat(keys) + .as("Provided key fields do not match expected keys") + .containsExactlyInAnyOrder(expectedKeys); + } + + @Override + public void setIsAppendOnly(Boolean isAppendOnly) { + assertThat(isAppendOnly) + .as("Provided isAppendOnly does not match expected isAppendOnly") + .isEqualTo(expectedIsAppendOnly); + } + + @Override + public TypeInformation<Row> getRecordType() { + return new RowTypeInfo(fTypes, fNames); + } + + @Override + public DataStreamSink<?> consumeDataStream(DataStream<Tuple2<Boolean, Row>> s) { + return s.addSink(new RowSink()); + } + + @Override + public String[] getFieldNames() { + return fNames; + } + + @Override + public TypeInformation<?>[] getFieldTypes() { + return fTypes; + } + + @Override + public TableSink<Tuple2<Boolean, Row>> configure( + String[] fieldNames, TypeInformation<?>[] fieldTypes) { + final TestUpsertSink copy = new TestUpsertSink(expectedKeys, expectedIsAppendOnly); + copy.fNames = fieldNames; + copy.fTypes = fieldTypes; + return copy; + } +} diff --git a/flink-python/src/test/scala/org/apache/flink/table/legacyutils/TestCollectionTableFactory.scala b/flink-python/src/test/scala/org/apache/flink/table/legacyutils/TestCollectionTableFactory.scala deleted file mode 100644 index 0d8e03a..0000000 --- a/flink-python/src/test/scala/org/apache/flink/table/legacyutils/TestCollectionTableFactory.scala +++ /dev/null @@ -1,253 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.table.legacyutils - -import org.apache.flink.api.common.ExecutionConfig -import org.apache.flink.api.common.typeinfo.TypeInformation -import org.apache.flink.api.common.typeutils.TypeSerializer -import org.apache.flink.api.java.io.CollectionInputFormat -import org.apache.flink.api.java.typeutils.RowTypeInfo -import org.apache.flink.configuration.Configuration -import org.apache.flink.streaming.api.datastream.{DataStream, DataStreamSink, DataStreamSource} -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment -import org.apache.flink.streaming.api.functions.sink.RichSinkFunction -import org.apache.flink.table.api.TableSchema -import org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR -import org.apache.flink.table.descriptors.{DescriptorProperties, Schema} -import org.apache.flink.table.factories.{StreamTableSinkFactory, StreamTableSourceFactory} -import org.apache.flink.table.functions.{AsyncTableFunction, TableFunction} -import org.apache.flink.table.legacyutils.TestCollectionTableFactory.{getCollectionSink, getCollectionSource} -import org.apache.flink.table.sinks.{AppendStreamTableSink, StreamTableSink, TableSink} -import org.apache.flink.table.sources.{LookupableTableSource, StreamTableSource, TableSource} -import org.apache.flink.types.Row - -import java.io.IOException -import java.util -import java.util.{Optional, ArrayList => JArrayList, LinkedList => JLinkedList, List => JList, Map => JMap} - -import scala.collection.JavaConversions._ - -/** - * Testing utils adopted from legacy planner until the Python code is updated. - */ -@deprecated -class TestCollectionTableFactory - extends StreamTableSourceFactory[Row] - with StreamTableSinkFactory[Row] -{ - - override def createTableSource(properties: JMap[String, String]): TableSource[Row] = { - getCollectionSource(properties, TestCollectionTableFactory.isStreaming) - } - - override def createTableSink(properties: JMap[String, String]): TableSink[Row] = { - getCollectionSink(properties) - } - - override def createStreamTableSource(properties: JMap[String, String]): StreamTableSource[Row] = { - getCollectionSource(properties, isStreaming = true) - } - - override def createStreamTableSink(properties: JMap[String, String]): StreamTableSink[Row] = { - getCollectionSink(properties) - } - - override def requiredContext(): JMap[String, String] = { - val context = new util.HashMap[String, String]() - context.put(CONNECTOR, "COLLECTION") - context - } - - override def supportedProperties(): JList[String] = { - val supported = new JArrayList[String]() - supported.add("*") - supported - } -} - -@deprecated -object TestCollectionTableFactory { - var isStreaming: Boolean = true - - val SOURCE_DATA = new JLinkedList[Row]() - val DIM_DATA = new JLinkedList[Row]() - val RESULT = new JLinkedList[Row]() - private var emitIntervalMS = -1L - - def initData(sourceData: JList[Row], - dimData: JList[Row] = List(), - emitInterval: Long = -1L): Unit ={ - SOURCE_DATA.addAll(sourceData) - DIM_DATA.addAll(dimData) - emitIntervalMS = emitInterval - } - - def reset(): Unit ={ - RESULT.clear() - SOURCE_DATA.clear() - DIM_DATA.clear() - emitIntervalMS = -1L - } - - def getCollectionSource(props: JMap[String, String], - isStreaming: Boolean): CollectionTableSource = { - val properties = new DescriptorProperties() - properties.putProperties(props) - val schema = properties.getTableSchema(Schema.SCHEMA) - val parallelism = properties.getOptionalInt("parallelism") - new CollectionTableSource(emitIntervalMS, schema, isStreaming, parallelism) - } - - def getCollectionSink(props: JMap[String, String]): CollectionTableSink = { - val properties = new DescriptorProperties() - properties.putProperties(props) - val schema = properties.getTableSchema(Schema.SCHEMA) - new CollectionTableSink(schema.toRowType.asInstanceOf[RowTypeInfo]) - } - - /** - * Table source of collection. - */ - class CollectionTableSource( - val emitIntervalMs: Long, - val schema: TableSchema, - val isStreaming: Boolean, - val parallelism: Optional[Integer]) - extends StreamTableSource[Row] - with LookupableTableSource[Row] { - - private val rowType: TypeInformation[Row] = schema.toRowType - - override def isBounded: Boolean = !isStreaming - - override def getDataStream(streamEnv: StreamExecutionEnvironment): DataStreamSource[Row] = { - val dataStream = streamEnv.createInput(new TestCollectionInputFormat[Row](emitIntervalMs, - SOURCE_DATA, - rowType.createSerializer(new ExecutionConfig)), - rowType) - if (parallelism.isPresent) { - dataStream.setParallelism(parallelism.get()) - } - dataStream - } - - override def getReturnType: TypeInformation[Row] = rowType - - override def getTableSchema: TableSchema = { - schema - } - - override def getLookupFunction(lookupKeys: Array[String]): TemporalTableFetcher = { - new TemporalTableFetcher(DIM_DATA, lookupKeys.map(schema.getFieldNames.indexOf(_))) - } - - override def getAsyncLookupFunction(lookupKeys: Array[String]): AsyncTableFunction[Row] = null - - override def isAsyncEnabled: Boolean = false - } - - /** - * Table sink of collection. - */ - class CollectionTableSink(val outputType: RowTypeInfo) - extends AppendStreamTableSink[Row] { - - override def getOutputType: RowTypeInfo = outputType - - override def getFieldNames: Array[String] = outputType.getFieldNames - - override def getFieldTypes: Array[TypeInformation[_]] = { - outputType.getFieldTypes - } - - override def consumeDataStream(dataStream: DataStream[Row]): DataStreamSink[_] = { - dataStream.addSink(new UnsafeMemorySinkFunction(outputType)).setParallelism(1) - } - - override def configure(fieldNames: Array[String], - fieldTypes: Array[TypeInformation[_]]): TableSink[Row] = this - } - - /** - * Sink function of unsafe memory. - */ - class UnsafeMemorySinkFunction(outputType: TypeInformation[Row]) extends RichSinkFunction[Row] { - private var serializer: TypeSerializer[Row] = _ - - override def open(param: Configuration): Unit = { - serializer = outputType.createSerializer(new ExecutionConfig) - } - - @throws[Exception] - override def invoke(row: Row): Unit = { - RESULT.add(serializer.copy(row)) - } - } - - /** - * Collection inputFormat for testing. - */ - class TestCollectionInputFormat[T]( - val emitIntervalMs: Long, - val dataSet: java.util.Collection[T], - val serializer: TypeSerializer[T]) - extends CollectionInputFormat[T](dataSet, serializer) { - @throws[IOException] - override def reachedEnd: Boolean = { - if (emitIntervalMs > 0) { - try - Thread.sleep(emitIntervalMs) - catch { - case _: InterruptedException => - } - } - super.reachedEnd - } - } - - /** - * Dimension table source fetcher. - */ - class TemporalTableFetcher( - val dimData: JLinkedList[Row], - val keys: Array[Int]) extends TableFunction[Row] { - - @throws[Exception] - def eval(values: Any*): Unit = { - for (data <- dimData) { - var matched = true - var idx = 0 - while (matched && idx < keys.length) { - val dimField = data.getField(keys(idx)) - val inputField = values(idx) - matched = dimField.equals(inputField) - idx += 1 - } - if (matched) { - // copy the row data - val ret = new Row(data.getArity) - 0 until data.getArity foreach { idx => - ret.setField(idx, data.getField(idx)) - } - collect(ret) - } - } - } - } -} diff --git a/flink-python/src/test/scala/org/apache/flink/table/legacyutils/legacyTestingDescriptors.scala b/flink-python/src/test/scala/org/apache/flink/table/legacyutils/legacyTestingDescriptors.scala deleted file mode 100644 index f08d214..0000000 --- a/flink-python/src/test/scala/org/apache/flink/table/legacyutils/legacyTestingDescriptors.scala +++ /dev/null @@ -1,80 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.table.legacyutils - -import org.apache.flink.api.common.typeinfo.{TypeInformation, Types} -import org.apache.flink.streaming.api.watermark.Watermark -import org.apache.flink.table.api.{DataTypes, ValidationException} -import org.apache.flink.table.expressions.{ApiExpressionUtils, Expression, FieldReferenceExpression, ResolvedFieldReference} -import org.apache.flink.table.functions.BuiltInFunctionDefinitions -import org.apache.flink.table.sources.tsextractors.TimestampExtractor -import org.apache.flink.table.sources.wmstrategies.PunctuatedWatermarkAssigner -import org.apache.flink.table.types.utils.TypeConversions -import org.apache.flink.types.Row - -/* - * Testing utils adopted from legacy planner until the Python code is updated. - */ - -@deprecated -class CustomAssigner extends PunctuatedWatermarkAssigner() { - override def getWatermark(row: Row, timestamp: Long): Watermark = - throw new UnsupportedOperationException() -} - -@deprecated -class CustomExtractor(val field: String) extends TimestampExtractor { - def this() = { - this("ts") - } - - override def getArgumentFields: Array[String] = Array(field) - - override def validateArgumentFields(argumentFieldTypes: Array[TypeInformation[_]]): Unit = { - argumentFieldTypes(0) match { - case Types.SQL_TIMESTAMP => - case _ => - throw new ValidationException( - s"Field 'ts' must be of type Timestamp but is of type ${argumentFieldTypes(0)}.") - } - } - - override def getExpression(fieldAccesses: Array[ResolvedFieldReference]): Expression = { - val fieldAccess = fieldAccesses(0) - require(fieldAccess.resultType == Types.SQL_TIMESTAMP) - val fieldReferenceExpr = new FieldReferenceExpression( - fieldAccess.name, - TypeConversions.fromLegacyInfoToDataType(fieldAccess.resultType), - 0, - fieldAccess.fieldIndex) - ApiExpressionUtils.unresolvedCall( - BuiltInFunctionDefinitions.CAST, - fieldReferenceExpr, - ApiExpressionUtils.typeLiteral(DataTypes.BIGINT())) - } - - override def equals(other: Any): Boolean = other match { - case that: CustomExtractor => field == that.field - case _ => false - } - - override def hashCode(): Int = { - field.hashCode - } -} diff --git a/flink-python/src/test/scala/org/apache/flink/table/legacyutils/legacyTestingFunctions.scala b/flink-python/src/test/scala/org/apache/flink/table/legacyutils/legacyTestingFunctions.scala deleted file mode 100644 index 9c032ca..0000000 --- a/flink-python/src/test/scala/org/apache/flink/table/legacyutils/legacyTestingFunctions.scala +++ /dev/null @@ -1,152 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.table.legacyutils - -import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} -import org.apache.flink.api.java.typeutils.TupleTypeInfo -import org.apache.flink.table.functions.{AggregateFunction, FunctionContext, ScalarFunction, TableFunction} -import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2} - -import java.lang.{Iterable => JIterable} -import org.junit.Assert - -/* - * Testing utils adopted from legacy planner until the Python code is updated. - */ - -@deprecated -class RichFunc0 extends ScalarFunction { - var openCalled = false - var closeCalled = false - - override def open(context: FunctionContext): Unit = { - super.open(context) - if (openCalled) { - Assert.fail("Open called more than once.") - } else { - openCalled = true - } - if (closeCalled) { - Assert.fail("Close called before open.") - } - } - - def eval(index: Int): Int = { - if (!openCalled) { - Assert.fail("Open was not called before eval.") - } - if (closeCalled) { - Assert.fail("Close called before eval.") - } - - index + 1 - } - - override def close(): Unit = { - super.close() - if (closeCalled) { - Assert.fail("Close called more than once.") - } else { - closeCalled = true - } - if (!openCalled) { - Assert.fail("Open was not called before close.") - } - } -} - -@deprecated -class MaxAccumulator[T] extends JTuple2[T, Boolean] - -@deprecated -abstract class MaxAggFunction[T](implicit ord: Ordering[T]) - extends AggregateFunction[T, MaxAccumulator[T]] { - - override def createAccumulator(): MaxAccumulator[T] = { - val acc = new MaxAccumulator[T] - acc.f0 = getInitValue - acc.f1 = false - acc - } - - def accumulate(acc: MaxAccumulator[T], value: Any): Unit = { - if (value != null) { - val v = value.asInstanceOf[T] - if (!acc.f1 || ord.compare(acc.f0, v) < 0) { - acc.f0 = v - acc.f1 = true - } - } - } - - override def getValue(acc: MaxAccumulator[T]): T = { - if (acc.f1) { - acc.f0 - } else { - null.asInstanceOf[T] - } - } - - def merge(acc: MaxAccumulator[T], its: JIterable[MaxAccumulator[T]]): Unit = { - val iter = its.iterator() - while (iter.hasNext) { - val a = iter.next() - if (a.f1) { - accumulate(acc, a.f0) - } - } - } - - def resetAccumulator(acc: MaxAccumulator[T]): Unit = { - acc.f0 = getInitValue - acc.f1 = false - } - - override def getAccumulatorType: TypeInformation[MaxAccumulator[T]] = { - new TupleTypeInfo( - classOf[MaxAccumulator[T]], - getValueTypeInfo, - BasicTypeInfo.BOOLEAN_TYPE_INFO) - } - - def getInitValue: T - - def getValueTypeInfo: TypeInformation[_] -} - -@deprecated -class ByteMaxAggFunction extends MaxAggFunction[Byte] { - override def getInitValue: Byte = 0.toByte - override def getValueTypeInfo = BasicTypeInfo.BYTE_TYPE_INFO -} - -@deprecated -class TableFunc1 extends TableFunction[String] { - def eval(str: String): Unit = { - if (str.contains("#")){ - str.split("#").foreach(collect) - } - } - - def eval(str: String, prefix: String): Unit = { - if (str.contains("#")) { - str.split("#").foreach(s => collect(prefix + s)) - } - } -} diff --git a/flink-python/src/test/scala/org/apache/flink/table/legacyutils/legacyTestingSinks.scala b/flink-python/src/test/scala/org/apache/flink/table/legacyutils/legacyTestingSinks.scala deleted file mode 100644 index 871f6a1..0000000 --- a/flink-python/src/test/scala/org/apache/flink/table/legacyutils/legacyTestingSinks.scala +++ /dev/null @@ -1,200 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.table.legacyutils - -import org.apache.flink.api.common.functions.MapFunction -import org.apache.flink.api.common.typeinfo.TypeInformation -import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2} -import org.apache.flink.api.java.typeutils.RowTypeInfo -import org.apache.flink.streaming.api.datastream.{DataStream, DataStreamSink} -import org.apache.flink.streaming.api.functions.sink.SinkFunction -import org.apache.flink.table.sinks.{AppendStreamTableSink, RetractStreamTableSink, TableSink, UpsertStreamTableSink} -import org.apache.flink.types.Row - -import java.lang.{Boolean => JBool} - -import scala.collection.mutable - -/* - * Testing utils adopted from legacy planner until the Python code is updated. - */ - -@deprecated -private[flink] class TestAppendSink extends AppendStreamTableSink[Row] { - - var fNames: Array[String] = _ - var fTypes: Array[TypeInformation[_]] = _ - - override def consumeDataStream(dataStream: DataStream[Row]): DataStreamSink[_] = { - dataStream.map( - new MapFunction[Row, JTuple2[JBool, Row]] { - override def map(value: Row): JTuple2[JBool, Row] = new JTuple2(true, value) - }) - .addSink(new RowSink) - } - - override def getOutputType: TypeInformation[Row] = new RowTypeInfo(fTypes, fNames) - - override def getFieldNames: Array[String] = fNames - - override def getFieldTypes: Array[TypeInformation[_]] = fTypes - - override def configure( - fieldNames: Array[String], - fieldTypes: Array[TypeInformation[_]]): TableSink[Row] = { - val copy = new TestAppendSink - copy.fNames = fieldNames - copy.fTypes = fieldTypes - copy - } -} - -@deprecated -private[flink] class TestRetractSink extends RetractStreamTableSink[Row] { - - var fNames: Array[String] = _ - var fTypes: Array[TypeInformation[_]] = _ - - override def consumeDataStream(s: DataStream[JTuple2[JBool, Row]]): DataStreamSink[_] = { - s.addSink(new RowSink) - } - - override def getRecordType: TypeInformation[Row] = new RowTypeInfo(fTypes, fNames) - - override def getFieldNames: Array[String] = fNames - - override def getFieldTypes: Array[TypeInformation[_]] = fTypes - - override def configure( - fieldNames: Array[String], - fieldTypes: Array[TypeInformation[_]]): TableSink[JTuple2[JBool, Row]] = { - val copy = new TestRetractSink - copy.fNames = fieldNames - copy.fTypes = fieldTypes - copy - } -} - -@deprecated -private[flink] class TestUpsertSink( - expectedKeys: Array[String], - expectedIsAppendOnly: Boolean) - extends UpsertStreamTableSink[Row] { - - var fNames: Array[String] = _ - var fTypes: Array[TypeInformation[_]] = _ - - override def setKeyFields(keys: Array[String]): Unit = - if (keys != null) { - if (!expectedKeys.sorted.mkString(",").equals(keys.sorted.mkString(","))) { - throw new AssertionError("Provided key fields do not match expected keys") - } - } else { - if (expectedKeys != null) { - throw new AssertionError("Provided key fields should not be null.") - } - } - - override def setIsAppendOnly(isAppendOnly: JBool): Unit = - if (expectedIsAppendOnly != isAppendOnly) { - throw new AssertionError("Provided isAppendOnly does not match expected isAppendOnly") - } - - override def getRecordType: TypeInformation[Row] = new RowTypeInfo(fTypes, fNames) - - override def consumeDataStream(s: DataStream[JTuple2[JBool, Row]]): DataStreamSink[_] = { - s.addSink(new RowSink) - } - - override def getFieldNames: Array[String] = fNames - - override def getFieldTypes: Array[TypeInformation[_]] = fTypes - - override def configure( - fieldNames: Array[String], - fieldTypes: Array[TypeInformation[_]]): TableSink[JTuple2[JBool, Row]] = { - val copy = new TestUpsertSink(expectedKeys, expectedIsAppendOnly) - copy.fNames = fieldNames - copy.fTypes = fieldTypes - copy - } -} - -@deprecated -class RowSink extends SinkFunction[JTuple2[JBool, Row]] { - override def invoke(value: JTuple2[JBool, Row]): Unit = RowCollector.addValue(value) -} - -@deprecated -object RowCollector { - private val sink: mutable.ArrayBuffer[JTuple2[JBool, Row]] = - new mutable.ArrayBuffer[JTuple2[JBool, Row]]() - - def addValue(value: JTuple2[JBool, Row]): Unit = { - - // make a copy - val copy = new JTuple2[JBool, Row](value.f0, Row.copy(value.f1)) - sink.synchronized { - sink += copy - } - } - - def getAndClearValues: List[JTuple2[JBool, Row]] = { - val out = sink.toList - sink.clear() - out - } - - /** Converts a list of retraction messages into a list of final results. */ - def retractResults(results: List[JTuple2[JBool, Row]]): List[String] = { - - val retracted = results - .foldLeft(Map[String, Int]()){ (m: Map[String, Int], v: JTuple2[JBool, Row]) => - val cnt = m.getOrElse(v.f1.toString, 0) - if (v.f0) { - m + (v.f1.toString -> (cnt + 1)) - } else { - m + (v.f1.toString -> (cnt - 1)) - } - }.filter{ case (_, c: Int) => c != 0 } - - if (retracted.exists{ case (_, c: Int) => c < 0}) { - throw new AssertionError("Received retracted rows which have not been accumulated.") - } - - retracted.flatMap { case (r: String, c: Int) => (0 until c).map(_ => r) }.toList - } - - /** Converts a list of upsert messages into a list of final results. */ - def upsertResults(results: List[JTuple2[JBool, Row]], keys: Array[Int]): List[String] = { - - def getKeys(r: Row): Row = Row.project(r, keys) - - val upserted = results.foldLeft(Map[Row, String]()){ (o: Map[Row, String], r) => - val key = getKeys(r.f1) - if (r.f0) { - o + (key -> r.f1.toString) - } else { - o - key - } - } - - upserted.values.toList - } -}
