This is an automated email from the ASF dual-hosted git repository. eskabetxe pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/bahir-flink.git
commit 3e24c95be17ead8008ebe67655eb41b7a103752d Author: min <[email protected]> AuthorDate: Thu May 19 16:39:31 2022 +0800 feat: add dynamicSourceSinkFactory SPI Class&add dynamicSink test --- .../org.apache.flink.table.factories.Factory | 16 ++++ .../kudu/table/dynamic/KuduDynamicSinkTest.java | 92 ++++++++++++++++++++++ 2 files changed, 108 insertions(+) diff --git a/flink-connector-kudu/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory b/flink-connector-kudu/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory new file mode 100644 index 0000000..6da911f --- /dev/null +++ b/flink-connector-kudu/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +org.apache.flink.connectors.kudu.table.dynamic.KuduDynamicTableSourceSinkFactory \ No newline at end of file diff --git a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/table/dynamic/KuduDynamicSinkTest.java b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/table/dynamic/KuduDynamicSinkTest.java new file mode 100644 index 0000000..f74a553 --- /dev/null +++ b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/table/dynamic/KuduDynamicSinkTest.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.connectors.kudu.table.dynamic; + +import org.apache.flink.connectors.kudu.connector.KuduTableInfo; +import org.apache.flink.connectors.kudu.connector.KuduTestBase; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.api.TableEnvironment; +import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; +import org.apache.flink.types.Row; +import org.apache.flink.util.CloseableIterator; +import org.apache.flink.util.CollectionUtil; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.util.Iterator; +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; + +/** + * Unit Tests for {@link KuduDynamicTableSink}. + */ +public class KuduDynamicSinkTest extends KuduTestBase { + public final static String INPUT_TABLE = "books"; + public static StreamExecutionEnvironment env; + public static TableEnvironment tEnv; + + @BeforeEach + public void init() { + KuduTableInfo tableInfo = booksTableInfo(INPUT_TABLE, true); + setUpDatabase(tableInfo); + env = StreamExecutionEnvironment.getExecutionEnvironment(); + tEnv = StreamTableEnvironment.create(env); + } + + @AfterEach + public void clean() { + KuduTableInfo tableInfo = booksTableInfo(INPUT_TABLE, true); + cleanDatabase(tableInfo); + } + + @Test + public void testKuduSSink() throws Exception { + // "id", "title", "author", "price", "quantity" + tEnv.executeSql( + "CREATE TABLE " + + INPUT_TABLE + + "(" + + "id int," + + "title string," + + "author string," + + "price double," + + "quantity int" + + ") WITH (" + + " 'connector'='kudu'," + + " 'kudu.master'='" + + getMasterAddress() + + "'," + + " 'kudu.table'='" + + INPUT_TABLE + + "','kudu.primary-key-columns'='id'" + + "','kudu.max-buffer-size'='1024'" + + "','kudu.flush-interval'='1000'" + + "','kudu.operation-timeout'='500'" + + "','kudu.ignore-not-found'='true'" + + "','kudu.ignore-not-found'='true'" + + ")"); + + tEnv.executeSql("insert into " + INPUT_TABLE + " values(1006,'test title','test author',10.1,10)"); + CloseableIterator<Row> collected = tEnv.executeSql("select * from " + INPUT_TABLE + " where id =1006").collect(); + assertNotNull(collected); + } +}
