nicole Wang created BAHIR-258: --------------------------------- Summary: Add NebulaGraph Connector for Flink Key: BAHIR-258 URL: https://issues.apache.org/jira/browse/BAHIR-258 Project: Bahir Issue Type: New Feature Components: Flink Streaming Connectors Affects Versions: Flink-1.0, Flink-Next Reporter: nicole Wang Fix For: Flink-1.0
NebulaGraph([https://nebula-graph.io/]) is a graph database built for super large-scale graphs with milliseconds of latency. NebulaGraph is open source, distributed ,scalable and lightning fast. NebulaGraph source code: [https://github.com/vesoft-inc] Graph database now is widely used in real-time recommendation, knowledge graph, financial risk control and other fields. And these scenes may use Flink to process data real-time. In order to rich the data engine of Flink and to facilitate users to apply graph database in the real-time system, we propose to integrate NebulaGraph into Apache Flink. We add source and sink to access NebulaGraph with Flink: 1. Source {code:java} // options to connect NebulaGraph NebulaClientOptions nebulaClientOptions = new NebulaClientOptions.NebulaClientOptionsBuilder() .setMetaAddress("127.0.0.1:45500") .build(); // NebulaGraph connection provider storageConnectionProvider = new NebulaStorageConnectionProvider(nebulaClientOptions); // options for NebulaGraph Source ExecutionOptions vertexExecutionOptions = new VertexExecutionOptions.ExecutionOptionBuilder() .setGraphSpace("flinkSource") .setTag("player") .setNoColumn(false) .setFields(Arrays.asList("name","age")) .setLimit(100) .builder(); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); NebulaSourceFunction sourceFunction = new NebulaSourceFunction(storageConnectionProvider) .setExecutionOptions(vertexExecutionOptions); DataStreamSource<BaseTableRow> dataStreamSource = env.addSource(sourceFunction); {code} 2. Sink {code:java} // options to connect NebulaGraph NebulaClientOptions nebulaClientOptions = new NebulaClientOptions.NebulaClientOptionsBuilder() .setMetaAddress("127.0.0.1:45500") .build(); // NebulaGraph connection provider NebulaGraphConnectionProvider graphConnectionProvider = new NebulaGraphConnectionProvider(nebulaClientOptions); NebulaMetaConnectionProvider metaConnectionProvider = new NebulaMetaConnectionProvider(nebulaClientOptions); // options for NebulaGraph Sink ExecutionOptions executionOptions = new VertexExecutionOptions.ExecutionOptionBuilder() .setGraphSpace("flinkSink") .setTag("player") .setIdIndex(0) .setFields(Arrays.asList("name", "age")) .setPositions(Arrays.asList(1, 2)) .setBatch(2) .builder(); NebulaBatchOutputFormat outPutFormat = new NebulaBatchOutputFormat(graphConnectionProvider, metaConnectionProvider) .setExecutionOptions(executionOptions); NebulaSinkFunction nebulaSinkFunction = new NebulaSinkFunction(outPutFormat); dataSource.addSink(nebulaSinkFunction);{code} -- This message was sent by Atlassian Jira (v8.3.4#803005)