Hi Ramya
I just tried to code the example which is worked in 1.10 which I using a custom
RichFlatMapFunction to connect ,transform data and release the conn in its
override method.
// app.java
public class RedisMapDemo {
public static void main(String[] args) throws Exception {
// 1. source
final StreamExecutionEnvironment env =
StreamExecutionEnvironment.createLocalEnvironment();
env.setParallelism(1);
DataStream<String> sourceStream = env.fromElements("test_value");
// 2. custom map function
DataStream<String> redisUpdatedStream = sourceStream.flatMap(new
RedisFlatMap());
redisUpdatedStream.print();
env.execute("testing redis flatmap");
}
}
// this should be saved as another java file (RedisFlatMap.java)
public class RedisFlatMap extends RichFlatMapFunction<String, String> {
String TEST_REDIS_KEY = "my_first_lettuce_key";
RedisClient redisClient;
StatefulRedisConnection<String, String> connection;
RedisCommands<String, String> syncCommands;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
redisClient = RedisClient.create("redis://localhost:6379/0");
connection = redisClient.connect();
syncCommands = connection.sync();
}
@Override
public void close() throws Exception {
super.close();
// maybe release conn here ?
connection.close();
redisClient.shutdown();
}
@Override
public void flatMap(String inputString, Collector<String> out)
throws Exception {
// 1. write to redis
// syncCommands.set(TEST_REDIS_KEY, " Hello, Redis!");
// 2. read from redis
String tmpValue = syncCommands.get(TEST_REDIS_KEY);
// 3. transform
out.collect(inputString + " - " + tmpValue);
}
}
-----邮件原件-----
发件人: Ramya Ramamurthy [mailto:[email protected]]
发送时间: 2020年7月21日 星期二 18:42
收件人: [email protected]
主题: Flink Redis connectivity
Hi,
As per the understanding we have from the documentation, I guess its not
possible to take the redis connection within the Data Stream. In that case, how
should i proceed ? How can i access a DB client object within the stream ??
I am using Flink 1.7. any help here would be appreciated. Thanks.
RedisClient redisClient = new
RedisClient(RedisURI.create("redis://localhost:6379"));
RedisConnection<String, String> client = redisClient.connect();
DataStream<String> parsedStream = ipdetailsStream.map((MapFunction<Row,
String>) value -> {
String ct = value.getField(5).toString();
String res = "";
if (ct.equals("14") || ct.equals("4")) {
res = client.set("key", "val");
}
return res;
});
Thanks,