[ 
https://issues.apache.org/jira/browse/BAHIR-232?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zl updated BAHIR-232:
---------------------
    Description: 
flink-connector-activemq does not support Flink Table API & SQL, based on the 
the existing code, it is not very difficult to support this feature, we just 
need to provide 4 classes: AMQTableSource、AMQTableSink、AMQTableSourceFactory 
and AMQTableSinkFactory. Then we can connect activemq by the following way:
{code:java}
String TABLE_CREATE_SQL = "CREATE TABLE books (" +
        " id int, " +
        " title varchar, " +
        " author varchar, " +
        " price double, " +
        " qty int " +
        ") with (" +
        " 'connector.type' = 'activemq', " +
        " 'connector.broker-url' = 'vm://localhost?broker.persistent=false', " +
        " 'connector.destination-type' = 'QUEUE', " +
        " 'connector.destination-name' = 'source_queue' " +
        ")";

String INITIALIZE_TABLE_SQL = "INSERT INTO books VALUES\n" +
        "(1001, 'Java public for dummies', 'Tan Ah Teck', 11.11, 11),\n" +
        "(1002, 'More Java for dummies', 'Tan Ah Teck', 22.22, 22),\n" +
        "(1003, 'More Java for more dummies', 'Mohammad Ali', 33.33, 33),\n" +
        "(1004, 'A Cup of Java', 'Kumar', 44.44, 44),\n" +
        "(1005, 'A Teaspoon of Java', 'Kevin Jones', 55.55, 55),\n" +
        "(1006, 'A Teaspoon of Java 1.4', 'Kevin Jones', 66.66, 66),\n" +
        "(1007, 'A Teaspoon of Java 1.5', 'Kevin Jones', 77.77, 77),\n" +
        "(1008, 'A Teaspoon of Java 1.6', 'Kevin Jones', 88.88, 88),\n" +
        "(1009, 'A Teaspoon of Java 1.7', 'Kevin Jones', 99.99, 99),\n" +
        "(1010, 'A Teaspoon of Java 1.8', 'Kevin Jones', 33.33, 100)";

String QUERY_TABLE_SQL = "SELECT * FROM books";

// create activemq source table
tEnv.sqlUpdate(TABLE_CREATE_SQL);

// produce event to activemq
tEnv.sqlUpdate(INITIALIZE_TABLE_SQL);

// consume from activemq
Table table = tEnv.sqlQuery(QUERY_TABLE_SQL);
{code}
 

  was:
flink-connector-activemq does not support Flink Table API & SQL, based on the 
the existing code, it is not very difficult to support this feature, we just 
need to provide 4 classes: AMQTableSource、AMQTableSink、AMQTableSourceFactory 
and AMQTableSourceFactory. Then we can connect activemq by the following way:
{code:java}
String TABLE_CREATE_SQL = "CREATE TABLE books (" +
        " id int, " +
        " title varchar, " +
        " author varchar, " +
        " price double, " +
        " qty int " +
        ") with (" +
        " 'connector.type' = 'activemq', " +
        " 'connector.broker-url' = 'vm://localhost?broker.persistent=false', " +
        " 'connector.destination-type' = 'QUEUE', " +
        " 'connector.destination-name' = 'source_queue' " +
        ")";

String INITIALIZE_TABLE_SQL = "INSERT INTO books VALUES\n" +
        "(1001, 'Java public for dummies', 'Tan Ah Teck', 11.11, 11),\n" +
        "(1002, 'More Java for dummies', 'Tan Ah Teck', 22.22, 22),\n" +
        "(1003, 'More Java for more dummies', 'Mohammad Ali', 33.33, 33),\n" +
        "(1004, 'A Cup of Java', 'Kumar', 44.44, 44),\n" +
        "(1005, 'A Teaspoon of Java', 'Kevin Jones', 55.55, 55),\n" +
        "(1006, 'A Teaspoon of Java 1.4', 'Kevin Jones', 66.66, 66),\n" +
        "(1007, 'A Teaspoon of Java 1.5', 'Kevin Jones', 77.77, 77),\n" +
        "(1008, 'A Teaspoon of Java 1.6', 'Kevin Jones', 88.88, 88),\n" +
        "(1009, 'A Teaspoon of Java 1.7', 'Kevin Jones', 99.99, 99),\n" +
        "(1010, 'A Teaspoon of Java 1.8', 'Kevin Jones', 33.33, 100)";

String QUERY_TABLE_SQL = "SELECT * FROM books";

// create activemq source table
tEnv.sqlUpdate(TABLE_CREATE_SQL);

// produce event to activemq
tEnv.sqlUpdate(INITIALIZE_TABLE_SQL);

// consume from activemq
Table table = tEnv.sqlQuery(QUERY_TABLE_SQL);
{code}
 


> Support Flink Table API & SQL for flink-connector-activemq
> ----------------------------------------------------------
>
>                 Key: BAHIR-232
>                 URL: https://issues.apache.org/jira/browse/BAHIR-232
>             Project: Bahir
>          Issue Type: Improvement
>          Components: Flink Streaming Connectors
>    Affects Versions: Flink-1.0
>            Reporter: zl
>            Priority: Major
>
> flink-connector-activemq does not support Flink Table API & SQL, based on the 
> the existing code, it is not very difficult to support this feature, we just 
> need to provide 4 classes: AMQTableSource、AMQTableSink、AMQTableSourceFactory 
> and AMQTableSinkFactory. Then we can connect activemq by the following way:
> {code:java}
> String TABLE_CREATE_SQL = "CREATE TABLE books (" +
>         " id int, " +
>         " title varchar, " +
>         " author varchar, " +
>         " price double, " +
>         " qty int " +
>         ") with (" +
>         " 'connector.type' = 'activemq', " +
>         " 'connector.broker-url' = 'vm://localhost?broker.persistent=false', 
> " +
>         " 'connector.destination-type' = 'QUEUE', " +
>         " 'connector.destination-name' = 'source_queue' " +
>         ")";
> String INITIALIZE_TABLE_SQL = "INSERT INTO books VALUES\n" +
>         "(1001, 'Java public for dummies', 'Tan Ah Teck', 11.11, 11),\n" +
>         "(1002, 'More Java for dummies', 'Tan Ah Teck', 22.22, 22),\n" +
>         "(1003, 'More Java for more dummies', 'Mohammad Ali', 33.33, 33),\n" +
>         "(1004, 'A Cup of Java', 'Kumar', 44.44, 44),\n" +
>         "(1005, 'A Teaspoon of Java', 'Kevin Jones', 55.55, 55),\n" +
>         "(1006, 'A Teaspoon of Java 1.4', 'Kevin Jones', 66.66, 66),\n" +
>         "(1007, 'A Teaspoon of Java 1.5', 'Kevin Jones', 77.77, 77),\n" +
>         "(1008, 'A Teaspoon of Java 1.6', 'Kevin Jones', 88.88, 88),\n" +
>         "(1009, 'A Teaspoon of Java 1.7', 'Kevin Jones', 99.99, 99),\n" +
>         "(1010, 'A Teaspoon of Java 1.8', 'Kevin Jones', 33.33, 100)";
> String QUERY_TABLE_SQL = "SELECT * FROM books";
> // create activemq source table
> tEnv.sqlUpdate(TABLE_CREATE_SQL);
> // produce event to activemq
> tEnv.sqlUpdate(INITIALIZE_TABLE_SQL);
> // consume from activemq
> Table table = tEnv.sqlQuery(QUERY_TABLE_SQL);
> {code}
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to