This is an automated email from the ASF dual-hosted git repository.

eskabetxe pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bahir-flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 567a887  [BAHIR-257] RedisDescriptorTest add TableFactory sql operate 
test (#108)
567a887 is described below

commit 567a887fbfb6a95f870be50584d3fce820a76af1
Author: housezhang <[email protected]>
AuthorDate: Fri Jan 22 22:21:27 2021 +0800

    [BAHIR-257] RedisDescriptorTest add TableFactory sql operate test (#108)
    
    Co-authored-by: house.zhang
---
 .../connectors/redis/RedisDescriptorTest.java       | 21 +++++++++++++++++++++
 1 file changed, 21 insertions(+)

diff --git 
a/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisDescriptorTest.java
 
b/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisDescriptorTest.java
index 0e4ef17..3b48ea2 100644
--- 
a/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisDescriptorTest.java
+++ 
b/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisDescriptorTest.java
@@ -26,6 +26,7 @@ import 
org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
 import org.apache.flink.streaming.connectors.redis.descriptor.Redis;
 import org.apache.flink.streaming.connectors.redis.descriptor.RedisValidator;
 import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.Table;
 import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
 import org.apache.flink.table.descriptors.Schema;
 import org.apache.flink.types.Row;
@@ -71,6 +72,26 @@ public class RedisDescriptorTest extends  RedisITCaseBase{
         tableEnvironment.executeSql("insert into redis select k, v from t1");
     }
 
+    @Test
+    public void testRedisTableFactory() throws Exception {
+        DataStreamSource<Row> source = (DataStreamSource<Row>) 
env.addSource(new TestSourceFunctionString())
+                .returns(new RowTypeInfo(TypeInformation.of(String.class), 
TypeInformation.of(Long.class)));
+
+        EnvironmentSettings settings = EnvironmentSettings
+                .newInstance()
+                .useOldPlanner()
+                .inStreamingMode()
+                .build();
+        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, 
settings);
+        Table table = tableEnv.fromDataStream(source);
+        tableEnv.createTemporaryView("t1", table);
+
+        tableEnv.executeSql("CREATE TABLE redis (key  STRING, number BIGINT) 
WITH ('connector.type'='redis'," +
+                "'redis-mode'='cluster', 'key.ttl' = 
'70000','command'='INCRBY_EX','cluster-nodes'='" + REDIS_HOST + ":" + 
REDIS_PORT + "')");
+
+        tableEnv.executeSql("insert into redis select * from t1");
+
+    }
 
     private static class TestSourceFunctionString implements 
SourceFunction<Row> {
         private static final long serialVersionUID = 1L;

Reply via email to