Hi All, Please find below a sample siddhi file, and the related exception. Any comments are highly appreciated.
First the exception: Caused by: java.sql.BatchUpdateException: Batch entry 0 UPDATE my_test_table SET occurrence = (my_test_table.occurrence + '_str' ) WHERE (my_test_table.id = 2 ) was aborted: ERROR: operator does not exist: bigint + character varying Hint: No operator matches the given name and argument type(s). You might need to add explicit type casts. Position: 65 Call getNextException to see other errors in the batch. at org.postgresql.jdbc.BatchResultHandler.handleCompletion(BatchResultHandler.java:166) at org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:490) at org.postgresql.jdbc.PgStatement.executeBatch(PgStatement.java:835) at org.postgresql.jdbc.PgPreparedStatement.executeBatch(PgPreparedStatement.java:1556) at com.zaxxer.hikari.proxy.StatementProxy.executeBatch(StatementProxy.java:127) at com.zaxxer.hikari.proxy.HikariPreparedStatementProxy.executeBatch(HikariPreparedStatementProxy.java) at org.wso2.extension.siddhi.store.rdbms.RDBMSEventTable.batchProcessSQLUpdates(RDBMSEventTable.java:558) Siddhi file to re-produce the exception; ——— @App:name("Test App") @App:description("App_Description") -- Please refer to http://wso2.github.io/siddhi/documentation/siddhi-4.0/ -- insert into my_test_table (id, occurrence, some_key) values(1, 1, 'o'); -- insert into my_test_table (id, occurrence, some_key) values(2, 1000, 'nok'); define trigger TriggerStream at every 10 sec; @Store(type = "rdbms", jdbc.url = "jdbc:postgresql://localhost:5432/my_db", username = "my_user", password = "my_password" , jdbc.driver.name = "org.postgresql.Driver", table.name = 'my_test_table') @primaryKey('id') define table TestTable(id long, occurrence long, some_key string); @sink(type='log') define stream UpdateNumeric(triggered_time long, id long, occurrence long, some_key string); @sink(type='log') define stream UpdateString(triggered_time long, id long, occurrence long, some_key string); from TriggerStream left outer join TestTable on TestTable.id==1 select * insert into UpdateNumeric; from TriggerStream left outer join TestTable on TestTable.id==2 select * insert into UpdateString; from UpdateNumeric select * update TestTable set TestTable.occurrence = TestTable.occurrence + 1 on TestTable.id == id; from UpdateString select * update TestTable set TestTable.some_key = TestTable.some_key + "_str" on TestTable.id == id; ——— Best regards, Bahtiyar. > >> On 4 Apr 2018, at 17:27, Bahtiyar KARANLIK <bahti...@karanlik.org >> <mailto:bahti...@karanlik.org>> wrote: >> >> Hi All, >> >> I'm working on an "alarm management" test app. The idea is as follows: >> >> - I've an EventStream with following important fields: >> -- event_type = type of the event. i.e. threshold_exceeded_event >> -- event_source = the entity that is going to be monitored. i.e. >> region1.router1.cpu_utilization >> -- level = severity of the Event (i.e. 0 -> Info, 1-> warning, 2 -> critical) >> -- level_changed = boolean flag (whether the level has changed from the >> previous event item. i.e. if level is info at t0, and warning at t1, then t1 >> event has this level_changed flag set to true) >> >> Then my algorithm comes in: >> >> 1) if the level_changed flag is false and if it is not Info; >> --> Then I should have an "Active" (status == 1) alarm in the DB, and should >> update its occurence count. >> >> 2) if the level_changed flag is true; >> 2.a) I'm going to "clear" the alarm in the DB table (keywords: RDBMS Store >> Extension + PostgreSQL) >> 2.b) check 'level' field, and if it is not "Info" (==0), then Create a NEW >> alarm in the table with the current level (level = severity). >> >> Here starts the problem; >> >> for events with "level_changed" set to true, updating the active Alarms's >> status field is not executed at all!! I can create Alarms (AlarmsToCreate), >> Increment their Occurence count (AlarmsToIncrement), but can not set their >> status fields (AlarmsToClear). >> >> I guess, there is a problem with multiple DB hits (i.e. clear the Alarm-set >> status to 0 for alarm with id 3 and create a new Alarm) for the same event >> (within a stream). I can see that the logic can decide that it needs to >> Clear an alarm, and create a new one (logs with AlarmsToClear and >> AlarmsToCreate are there !. Please check below logs for further info). >> >> Any comments, guidance, is highly appreciated, since I'm stuck with this >> issue for two weekends up to now :-( Either, there is something that I'm >> completely missing related to Stream processing and DB hits or there is a >> bug somewhere over there (probably with store-rdbms extension's >> update-sql-compile logics..) >> >> Best regards, >> >> Please find below related part from my siddhi file: >> >> --------------------------------------------------------------- >> -- SELECT LEVEL CHANGED EVENTS >> >> @info(name = 'select_level_changed_events') >> -- from EventStream[cast(map:get(EventStream.context, 'level_changed'), >> 'string') == 'true' and cast(map:get(EventStream.context, 'old_level'), >> 'string') != '0'] >> from EventStream[cast(map:get(context, 'level_changed'), 'string') == 'true'] >> --select EventStream.tenant, EventStream.event_type, EventStream.event_uei, >> EventStream.event_source, EventStream.proposed_action, EventStream.context >> as event_context >> select * >> insert into TempLevelChangedEvents; >> >> --------------------------------------------------------------- >> -- CLEAR ALARMS >> @info(name='select_alarms_to_clear') >> from TempLevelChangedEvents left outer join AlarmTable[alarm_status==1] >> on (AlarmTable.alarm_status == 1 and AlarmTable.alarm_key == event_type >> and AlarmTable.alarmed_resource == event_source) >> select AlarmTable.id, AlarmTable.alarm_status, AlarmTable.severity, >> AlarmTable.alarm_key, AlarmTable.alarmed_resource, AlarmTable.occurrence, >> AlarmTable.source_time >> insert into AlarmsToClear; >> >> @info(name = 'clear_alarms') >> from AlarmsToClear >> select * >> update AlarmTable >> set AlarmTable.occurrence = AlarmTable.occurrence, AlarmTable.alarm_status >> = 0 >> on AlarmTable.id == id; >> >> --------------------------------------------------------------- >> --------------------------------------------------------------- >> -- CREATE ALARMS >> >> -- If level has changed to a NON-INFO severity/level, then we should create >> a new Alarm ! >> @info(name = 'select_events_to_create_alarms') >> from TempLevelChangedEvents[convert(cast(map:get(EventStream.context, >> 'level'), 'string'), 'int') != 0] >> select cast(-1, 'long') as id, cast(1, 'long') as alarm_status, >> convert(cast(map:get(context, 'level'), 'string'), 'int') as severity, >> event_type as alarm_key, event_source as alarmed_resource, cast(1, 'long') >> as occurrence, eventTimestamp() as source_time >> insert into AlarmsToCreate; >> >> @info(name='create_alarms') >> from AlarmsToCreate left outer join NextAlarmId >> select next_id as id, alarm_status, severity, alarm_key, alarmed_resource, >> occurrence, source_time >> insert into AlarmTable; >> >> --------------------------------------------------------------- >> --------------------------------------------------------------- >> -- Filter NON-INFO Events & load associated Alarms from the DB if exists !! >> >> -- Severity/Level has not changed AND this is not an INFO event ! >> -- IF SO, we should have an active Alarm in the DB, and should increment its >> "occurence" >> @info(name = 'select_non_info_events') >> from EventStream[ cast(map:get(context, 'level_changed'), 'string') != >> 'true' and convert(cast(map:get(EventStream.context, 'level'), 'string'), >> 'int') != 0] >> select * >> insert into TmpNonInfoEventStream; >> >> @info(name = 'load_active_alarms_for_non_info_events') >> from TmpNonInfoEventStream left outer join AlarmTable[alarm_status==1] >> on (AlarmTable.alarm_status == 1 and AlarmTable.alarm_key == event_type >> and AlarmTable.alarmed_resource == event_source) >> select * >> insert into TmpNonInfoEventAlarmStream; >> >> @info(name = 'select_alarms_to_increment') >> from TmpNonInfoEventAlarmStream[(not (id is null)) and severity == >> convert(cast(map:get(EventStream.context, 'level'), 'string'), 'int')] >> select AlarmTable.id, AlarmTable.alarm_status, AlarmTable.severity, >> AlarmTable.alarm_key, AlarmTable.alarmed_resource, AlarmTable.occurrence, >> AlarmTable.source_time >> insert into AlarmsToIncrement; >> >> >> @info(name = 'increment_alarms') >> from AlarmsToIncrement >> select * >> update AlarmTable >> set AlarmTable.occurrence = AlarmTable.occurrence+1 >> on AlarmTable.id == id; >> Also please see the logs below as well: >> Log related to successfull increment of the alarm count. I can see that this >> updates the occurrence count of the specified Alarm in the DB. >> >> 2018-04-04 17:22:40,599 INFO >> [org.wso2.siddhi.core.stream.output.sink.LogSink] - mond : AlarmsToIncrement >> : Event{timestamp=1522851760576, data=[139, 1, 1, thresholdEvent, >> region1.router1.cpu_utilization, 3, 1522850726953], isExpired=false} >> 2018-04-04 17:22:40,600 INFO >> [org.wso2.siddhi.core.stream.output.sink.LogSink] - mond : >> TmpNonInfoEventAlarmStream : Event{timestamp=1522851760576, data=[null, >> thresholdEvent, threshold_rearmed, region1.router1.cpu_utilization, TODO, >> {path=region1.router1.cpu_utilization, aggr_context={average=12584.705, >> current=12669.98, percentile_50=12584.705, percentile_95=12669.98, >> percentile_100=12669.98, percentile_99=12669.98}, level_changed=false, >> metric_context={node_profile=null, node=null, metric_name=null, city=null, >> description=description, location=null, region=null}, >> threshold_context={selectivity=1, metric_name=cpu_utilization, >> critical=15000.0, warning_action=MAIL_DEVOPS, warning=8000.0, >> aggr_field=rate, id=1, aggr_function=average, critical_action=MAIL_MANAGER, >> path_regex=.*cpu_utilization}, old_level=1, rate=12669.98, level=1, >> delta=3800994.0, value=4.22006964884E11, tenant=null, timestamp=1521697800}, >> 139, 1, 1, thresholdEvent, region1.router1.cpu_utilization, 3, >> 1522850726953], isExpired=false} >> >> HERE IS THE PROBLEM. I can see the AlarmsToClear, that exact Alarm's status >> must be set to 0. BUT, it is not changed in the DB! Also below, I can see >> the AlarmsToCreate log, and that item is successfully created in the DB.. >> >> [org.wso2.extension.siddhi.store.rdbms.util.RDBMSTableUtils] : Closed >> PreparedStatement Ignored FQCN: >> org.apache.commons.logging.impl.SLF4JLocationAwareLog >> [com.zaxxer.hikari.pool.PoolElf] : HikariPool-2 - Reset (nothing) on >> connection org.postgresql.jdbc.PgConnection@202d9236 >> [org.wso2.extension.siddhi.store.rdbms.util.RDBMSTableUtils] : Closed >> Connection Ignored FQCN: >> org.apache.commons.logging.impl.SLF4JLocationAwareLog >> 2018-04-04 17:23:43,031 INFO >> [org.wso2.siddhi.core.stream.output.sink.LogSink] - mond : AlarmsToClear : >> Event{timestamp=1522851823026, data=[139, 1, 1, thresholdEvent, >> region1.router1.cpu_utilization, 4, 1522850726953], isExpired=false} >> [org.wso2.extension.siddhi.store.rdbms.util.RDBMSTableUtils] : Closed >> ResultSet Ignored FQCN: org.apache.commons.logging.impl.SLF4JLocationAwareLog >> [org.wso2.extension.siddhi.store.rdbms.util.RDBMSTableUtils] : Closed >> PreparedStatement Ignored FQCN: >> org.apache.commons.logging.impl.SLF4JLocationAwareLog >> [com.zaxxer.hikari.pool.PoolElf] : HikariPool-2 - Reset (nothing) on >> connection org.postgresql.jdbc.PgConnection@202d9236 >> [org.wso2.extension.siddhi.store.rdbms.util.RDBMSTableUtils] : Closed >> Connection Ignored FQCN: >> org.apache.commons.logging.impl.SLF4JLocationAwareLog >> [org.wso2.extension.siddhi.store.rdbms.RDBMSEventTable] : Ignore the >> condition resolver in 'find()' method for compile condition: '?' Ignored >> FQCN: org.apache.commons.logging.impl.SLF4JLocationAwareLog >> [org.wso2.extension.siddhi.store.rdbms.util.RDBMSTableUtils] : Closed >> ResultSet Ignored FQCN: org.apache.commons.logging.impl.SLF4JLocationAwareLog >> [org.wso2.extension.siddhi.store.rdbms.util.RDBMSTableUtils] : Closed >> PreparedStatement Ignored FQCN: >> org.apache.commons.logging.impl.SLF4JLocationAwareLog >> [com.zaxxer.hikari.pool.PoolElf] : HikariPool-3 - Reset (nothing) on >> connection org.postgresql.jdbc.PgConnection@432af457 >> [org.wso2.extension.siddhi.store.rdbms.util.RDBMSTableUtils] : Closed >> Connection Ignored FQCN: >> org.apache.commons.logging.impl.SLF4JLocationAwareLog >> [org.wso2.extension.siddhi.store.rdbms.util.RDBMSTableUtils] : Closed >> PreparedStatement Ignored FQCN: >> org.apache.commons.logging.impl.SLF4JLocationAwareLog >> [com.zaxxer.hikari.pool.PoolElf] : HikariPool-2 - Reset (autoCommit) on >> connection org.postgresql.jdbc.PgConnection@202d9236 >> [org.wso2.extension.siddhi.store.rdbms.util.RDBMSTableUtils] : Closed >> Connection Ignored FQCN: >> org.apache.commons.logging.impl.SLF4JLocationAwareLog >> [org.wso2.extension.siddhi.store.rdbms.RDBMSEventTable] : Ignore the >> condition resolver in 'find()' method for compile condition: '?' Ignored >> FQCN: org.apache.commons.logging.impl.SLF4JLocationAwareLog >> [org.wso2.extension.siddhi.store.rdbms.util.RDBMSTableUtils] : Closed >> ResultSet Ignored FQCN: org.apache.commons.logging.impl.SLF4JLocationAwareLog >> [org.wso2.extension.siddhi.store.rdbms.util.RDBMSTableUtils] : Closed >> PreparedStatement Ignored FQCN: >> org.apache.commons.logging.impl.SLF4JLocationAwareLog >> [com.zaxxer.hikari.pool.PoolElf] : HikariPool-3 - Reset (nothing) on >> connection org.postgresql.jdbc.PgConnection@432af457 >> [org.wso2.extension.siddhi.store.rdbms.util.RDBMSTableUtils] : Closed >> Connection Ignored FQCN: >> org.apache.commons.logging.impl.SLF4JLocationAwareLog >> 2018-04-04 17:23:43,039 INFO >> [org.wso2.siddhi.core.stream.output.sink.LogSink] - mond : AlarmsToCreate : >> Event{timestamp=1522851823026, data=[-1, 1, 2, thresholdEvent, >> region1.router1.cpu_utilization, 1, 1522851823026], isExpired=false} >> 2018-04-04 17:23:43,040 INFO >> [org.wso2.siddhi.core.stream.output.sink.LogSink] - mond : EventStream : >> Event{timestamp=1522851823026, data=[null, thresholdEvent, >> threshold_rearmed, region1.router1.cpu_utilization, TODO, >> {path=region1.router1.cpu_utilization, aggr_context={average=12584.705, >> current=12584.705, percentile_50=12584.705, percentile_95=12669.98, >> percentile_100=12669.98, percentile_99=12669.98}, level_changed=true, >> metric_context={node_profile=null, node=null, metric_name=null, city=null, >> description=description, location=null, region=null}, >> threshold_context={selectivity=1, metric_name=cpu_utilization, >> critical=11000.0, warning_action=MAIL_DEVOPS, warning=8000.0, >> aggr_field=rate, id=1, aggr_function=average, critical_action=MAIL_MANAGER, >> path_regex=.*cpu_utilization}, old_level=1, rate=12584.705, level=2, >> delta=-7550823.0, value=4.21999414061E11, tenant=null, >> timestamp=1521697200}], isExpired=false} >> [org.wso2.extension.siddhi.io.http.source.HttpWorkerThread] : Submitted >> Event {"event": {"path": "region1.router1.cpu_utilization", "description": >> "description", "value": 421999414061.0, "timestamp": 1521697200}} Stream >> [org.wso2.transport.http.netty.listener.HTTPServerChannelInitializer] : >> Initializing source channel pipeline >> >> >> >> >> _______________________________________________ >> Dev mailing list >> Dev@wso2.org <mailto:Dev@wso2.org> >> http://wso2.org/cgi-bin/mailman/listinfo/dev >
_______________________________________________ Dev mailing list Dev@wso2.org http://wso2.org/cgi-bin/mailman/listinfo/dev