This is an automated email from the ASF dual-hosted git repository.
eskabetxe pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bahir-flink.git
The following commit(s) were added to refs/heads/master by this push:
new 567a887 [BAHIR-257] RedisDescriptorTest add TableFactory sql operate
test (#108)
567a887 is described below
commit 567a887fbfb6a95f870be50584d3fce820a76af1
Author: housezhang <[email protected]>
AuthorDate: Fri Jan 22 22:21:27 2021 +0800
[BAHIR-257] RedisDescriptorTest add TableFactory sql operate test (#108)
Co-authored-by: house.zhang
---
.../connectors/redis/RedisDescriptorTest.java | 21 +++++++++++++++++++++
1 file changed, 21 insertions(+)
diff --git
a/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisDescriptorTest.java
b/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisDescriptorTest.java
index 0e4ef17..3b48ea2 100644
---
a/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisDescriptorTest.java
+++
b/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisDescriptorTest.java
@@ -26,6 +26,7 @@ import
org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
import org.apache.flink.streaming.connectors.redis.descriptor.Redis;
import org.apache.flink.streaming.connectors.redis.descriptor.RedisValidator;
import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.descriptors.Schema;
import org.apache.flink.types.Row;
@@ -71,6 +72,26 @@ public class RedisDescriptorTest extends RedisITCaseBase{
tableEnvironment.executeSql("insert into redis select k, v from t1");
}
+ @Test
+ public void testRedisTableFactory() throws Exception {
+ DataStreamSource<Row> source = (DataStreamSource<Row>)
env.addSource(new TestSourceFunctionString())
+ .returns(new RowTypeInfo(TypeInformation.of(String.class),
TypeInformation.of(Long.class)));
+
+ EnvironmentSettings settings = EnvironmentSettings
+ .newInstance()
+ .useOldPlanner()
+ .inStreamingMode()
+ .build();
+ StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env,
settings);
+ Table table = tableEnv.fromDataStream(source);
+ tableEnv.createTemporaryView("t1", table);
+
+ tableEnv.executeSql("CREATE TABLE redis (key STRING, number BIGINT)
WITH ('connector.type'='redis'," +
+ "'redis-mode'='cluster', 'key.ttl' =
'70000','command'='INCRBY_EX','cluster-nodes'='" + REDIS_HOST + ":" +
REDIS_PORT + "')");
+
+ tableEnv.executeSql("insert into redis select * from t1");
+
+ }
private static class TestSourceFunctionString implements
SourceFunction<Row> {
private static final long serialVersionUID = 1L;