[ https://issues.apache.org/jira/browse/BAHIR-232?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
zl updated BAHIR-232: --------------------- Description: flink-connector-activemq does not support Flink Table API & SQL, based on the the existing code, it is not very difficult to support this feature, we just need to provide 4 classes: AMQTableSource、AMQTableSink、AMQTableSourceFactory and AMQTableSinkFactory. Then we can connect activemq by the following way: {code:java} String TABLE_CREATE_SQL = "CREATE TABLE books (" + " id int, " + " title varchar, " + " author varchar, " + " price double, " + " qty int " + ") with (" + " 'connector.type' = 'activemq', " + " 'connector.broker-url' = 'vm://localhost?broker.persistent=false', " + " 'connector.destination-type' = 'QUEUE', " + " 'connector.destination-name' = 'source_queue' " + ")"; String INITIALIZE_TABLE_SQL = "INSERT INTO books VALUES\n" + "(1001, 'Java public for dummies', 'Tan Ah Teck', 11.11, 11),\n" + "(1002, 'More Java for dummies', 'Tan Ah Teck', 22.22, 22),\n" + "(1003, 'More Java for more dummies', 'Mohammad Ali', 33.33, 33),\n" + "(1004, 'A Cup of Java', 'Kumar', 44.44, 44),\n" + "(1005, 'A Teaspoon of Java', 'Kevin Jones', 55.55, 55),\n" + "(1006, 'A Teaspoon of Java 1.4', 'Kevin Jones', 66.66, 66),\n" + "(1007, 'A Teaspoon of Java 1.5', 'Kevin Jones', 77.77, 77),\n" + "(1008, 'A Teaspoon of Java 1.6', 'Kevin Jones', 88.88, 88),\n" + "(1009, 'A Teaspoon of Java 1.7', 'Kevin Jones', 99.99, 99),\n" + "(1010, 'A Teaspoon of Java 1.8', 'Kevin Jones', 33.33, 100)"; String QUERY_TABLE_SQL = "SELECT * FROM books"; // create activemq source table tEnv.sqlUpdate(TABLE_CREATE_SQL); // produce event to activemq tEnv.sqlUpdate(INITIALIZE_TABLE_SQL); // consume from activemq Table table = tEnv.sqlQuery(QUERY_TABLE_SQL); {code} was: flink-connector-activemq does not support Flink Table API & SQL, based on the the existing code, it is not very difficult to support this feature, we just need to provide 4 classes: AMQTableSource、AMQTableSink、AMQTableSourceFactory and AMQTableSourceFactory. Then we can connect activemq by the following way: {code:java} String TABLE_CREATE_SQL = "CREATE TABLE books (" + " id int, " + " title varchar, " + " author varchar, " + " price double, " + " qty int " + ") with (" + " 'connector.type' = 'activemq', " + " 'connector.broker-url' = 'vm://localhost?broker.persistent=false', " + " 'connector.destination-type' = 'QUEUE', " + " 'connector.destination-name' = 'source_queue' " + ")"; String INITIALIZE_TABLE_SQL = "INSERT INTO books VALUES\n" + "(1001, 'Java public for dummies', 'Tan Ah Teck', 11.11, 11),\n" + "(1002, 'More Java for dummies', 'Tan Ah Teck', 22.22, 22),\n" + "(1003, 'More Java for more dummies', 'Mohammad Ali', 33.33, 33),\n" + "(1004, 'A Cup of Java', 'Kumar', 44.44, 44),\n" + "(1005, 'A Teaspoon of Java', 'Kevin Jones', 55.55, 55),\n" + "(1006, 'A Teaspoon of Java 1.4', 'Kevin Jones', 66.66, 66),\n" + "(1007, 'A Teaspoon of Java 1.5', 'Kevin Jones', 77.77, 77),\n" + "(1008, 'A Teaspoon of Java 1.6', 'Kevin Jones', 88.88, 88),\n" + "(1009, 'A Teaspoon of Java 1.7', 'Kevin Jones', 99.99, 99),\n" + "(1010, 'A Teaspoon of Java 1.8', 'Kevin Jones', 33.33, 100)"; String QUERY_TABLE_SQL = "SELECT * FROM books"; // create activemq source table tEnv.sqlUpdate(TABLE_CREATE_SQL); // produce event to activemq tEnv.sqlUpdate(INITIALIZE_TABLE_SQL); // consume from activemq Table table = tEnv.sqlQuery(QUERY_TABLE_SQL); {code} > Support Flink Table API & SQL for flink-connector-activemq > ---------------------------------------------------------- > > Key: BAHIR-232 > URL: https://issues.apache.org/jira/browse/BAHIR-232 > Project: Bahir > Issue Type: Improvement > Components: Flink Streaming Connectors > Affects Versions: Flink-1.0 > Reporter: zl > Priority: Major > > flink-connector-activemq does not support Flink Table API & SQL, based on the > the existing code, it is not very difficult to support this feature, we just > need to provide 4 classes: AMQTableSource、AMQTableSink、AMQTableSourceFactory > and AMQTableSinkFactory. Then we can connect activemq by the following way: > {code:java} > String TABLE_CREATE_SQL = "CREATE TABLE books (" + > " id int, " + > " title varchar, " + > " author varchar, " + > " price double, " + > " qty int " + > ") with (" + > " 'connector.type' = 'activemq', " + > " 'connector.broker-url' = 'vm://localhost?broker.persistent=false', > " + > " 'connector.destination-type' = 'QUEUE', " + > " 'connector.destination-name' = 'source_queue' " + > ")"; > String INITIALIZE_TABLE_SQL = "INSERT INTO books VALUES\n" + > "(1001, 'Java public for dummies', 'Tan Ah Teck', 11.11, 11),\n" + > "(1002, 'More Java for dummies', 'Tan Ah Teck', 22.22, 22),\n" + > "(1003, 'More Java for more dummies', 'Mohammad Ali', 33.33, 33),\n" + > "(1004, 'A Cup of Java', 'Kumar', 44.44, 44),\n" + > "(1005, 'A Teaspoon of Java', 'Kevin Jones', 55.55, 55),\n" + > "(1006, 'A Teaspoon of Java 1.4', 'Kevin Jones', 66.66, 66),\n" + > "(1007, 'A Teaspoon of Java 1.5', 'Kevin Jones', 77.77, 77),\n" + > "(1008, 'A Teaspoon of Java 1.6', 'Kevin Jones', 88.88, 88),\n" + > "(1009, 'A Teaspoon of Java 1.7', 'Kevin Jones', 99.99, 99),\n" + > "(1010, 'A Teaspoon of Java 1.8', 'Kevin Jones', 33.33, 100)"; > String QUERY_TABLE_SQL = "SELECT * FROM books"; > // create activemq source table > tEnv.sqlUpdate(TABLE_CREATE_SQL); > // produce event to activemq > tEnv.sqlUpdate(INITIALIZE_TABLE_SQL); > // consume from activemq > Table table = tEnv.sqlQuery(QUERY_TABLE_SQL); > {code} > -- This message was sent by Atlassian Jira (v8.3.4#803005)