[+ Mohan, Tishan] On Sun, Apr 8, 2018 at 9:21 PM, Bahtiyar KARANLIK <bahti...@karanlik.org> wrote:
> > Hi All, > > Please find below a sample siddhi file, and the related exception. Any > comments are highly appreciated. > > First the exception: > > Caused by: java.sql.BatchUpdateException: Batch entry 0 UPDATE > my_test_table SET occurrence = (my_test_table.occurrence + '_str' ) WHERE ( > my_test_table.id = 2 ) was aborted: ERROR: operator does not exist: > bigint + character varying > Hint: No operator matches the given name and argument type(s). You might > need to add explicit type casts. > Position: 65 Call getNextException to see other errors in the batch. > at org.postgresql.jdbc.BatchResultHandler.handleCompletion( > BatchResultHandler.java:166) > at org.postgresql.core.v3.QueryExecutorImpl.execute( > QueryExecutorImpl.java:490) > at org.postgresql.jdbc.PgStatement.executeBatch(PgStatement.java:835) > at org.postgresql.jdbc.PgPreparedStatement.executeBatch( > PgPreparedStatement.java:1556) > at com.zaxxer.hikari.proxy.StatementProxy.executeBatch( > StatementProxy.java:127) > at com.zaxxer.hikari.proxy.HikariPreparedStatementProxy.executeBatch( > HikariPreparedStatementProxy.java) > at org.wso2.extension.siddhi.store.rdbms.RDBMSEventTable. > batchProcessSQLUpdates(RDBMSEventTable.java:558) > > > > Siddhi file to re-produce the exception; > > ——— > > @App:name("Test App") > @App:description("App_Description") > -- Please refer to http://wso2.github.io/siddhi/documentation/siddhi-4.0/ > > -- insert into my_test_table (id, occurrence, some_key) values(1, 1, 'o'); > -- insert into my_test_table (id, occurrence, some_key) values(2, 1000, > 'nok'); > > define trigger TriggerStream at every 10 sec; > > @Store(type = "rdbms", jdbc.url = "jdbc:postgresql://localhost:5432/my_db", > username = "my_user", password = "my_password" , jdbc.driver.name = > "org.postgresql.Driver", table.name = 'my_test_table') > @primaryKey('id') > define table TestTable(id long, occurrence long, some_key string); > > @sink(type='log') > define stream UpdateNumeric(triggered_time long, id long, occurrence long, > some_key string); > > @sink(type='log') > define stream UpdateString(triggered_time long, id long, occurrence long, > some_key string); > > from TriggerStream left outer join TestTable > on TestTable.id==1 > select * > insert into UpdateNumeric; > > from TriggerStream left outer join TestTable > on TestTable.id==2 > select * > insert into UpdateString; > > from UpdateNumeric > select * > update TestTable > set TestTable.occurrence = TestTable.occurrence + 1 > on TestTable.id == id; > > from UpdateString > select * > update TestTable > set TestTable.some_key = TestTable.some_key + "_str" > on TestTable.id == id; > > ——— > > > > Best regards, > > Bahtiyar. > > > On 4 Apr 2018, at 17:27, Bahtiyar KARANLIK <bahti...@karanlik.org> wrote: > > Hi All, > > I'm working on an "alarm management" test app. The idea is as follows: > > - I've an EventStream with following important fields: > -- event_type = type of the event. i.e. threshold_exceeded_event > -- event_source = the entity that is going to be monitored. i.e. > region1.router1.cpu_utilization > -- level = severity of the Event (i.e. 0 -> Info, 1-> warning, 2 -> > critical) > -- level_changed = boolean flag (whether the level has changed from the > previous event item. i.e. if level is info at t0, and warning at t1, then > t1 event has this level_changed flag set to true) > > Then my algorithm comes in: > > 1) if the level_changed flag is false and if it is not Info; > --> Then I should have an "Active" (status == 1) alarm in the DB, and > should update its occurence count. > > 2) if the level_changed flag is true; > 2.a) I'm going to "clear" the alarm in the DB table (keywords: RDBMS Store > Extension + PostgreSQL) > 2.b) check 'level' field, and if it is not "Info" (==0), then Create a NEW > alarm in the table with the current level (level = severity). > > Here starts the problem; > > for events with "level_changed" set to true, updating the active Alarms's > status field is not executed at all!! I can create Alarms (AlarmsToCreate), > Increment their Occurence count (AlarmsToIncrement), but can not set their > status fields (AlarmsToClear). > > I guess, there is a problem with multiple DB hits (i.e. clear the > Alarm-set status to 0 for alarm with id 3 and create a new Alarm) for the > same event (within a stream). I can see that the logic can decide that it > needs to Clear an alarm, and create a new one (logs with AlarmsToClear and > AlarmsToCreate are there !. Please check below logs for further info). > > *Any comments, guidance, is highly appreciated, since I'm stuck with this > issue for two weekends up to now :-( Either, there is something that I'm > completely missing related to Stream processing and DB hits or there is a > bug somewhere over there (probably with store-rdbms extension's > update-sql-compile logics..) * > > Best regards, > > *Please find below related part from my siddhi file:* > > --------------------------------------------------------------- > -- SELECT LEVEL CHANGED EVENTS > > @info(name = 'select_level_changed_events') > -- from EventStream[cast(map:get(EventStream.context, 'level_changed'), > 'string') == 'true' and cast(map:get(EventStream.context, 'old_level'), > 'string') != '0'] > from EventStream[cast(map:get(context, 'level_changed'), 'string') == 'true'] > --select EventStream.tenant, EventStream.event_type, EventStream.event_uei, > EventStream.event_source, EventStream.proposed_action, EventStream.context as > event_context > select * > insert into TempLevelChangedEvents; > > --------------------------------------------------------------- > -- CLEAR ALARMS > @info(name='select_alarms_to_clear') > from TempLevelChangedEvents left outer join AlarmTable[alarm_status==1] > on (AlarmTable.alarm_status == 1 and AlarmTable.alarm_key == event_type > and AlarmTable.alarmed_resource == event_source) > select AlarmTable.id, AlarmTable.alarm_status, AlarmTable.severity, > AlarmTable.alarm_key, AlarmTable.alarmed_resource, AlarmTable.occurrence, > AlarmTable.source_time > insert into AlarmsToClear; > > @info(name = 'clear_alarms') > from AlarmsToClear > select * > update AlarmTable > set AlarmTable.occurrence = AlarmTable.occurrence, AlarmTable.alarm_status = > 0 > on AlarmTable.id == id; > > --------------------------------------------------------------- > --------------------------------------------------------------- > -- CREATE ALARMS > > -- If level has changed to a NON-INFO severity/level, then we should create a > new Alarm ! > @info(name = 'select_events_to_create_alarms') > from TempLevelChangedEvents[convert(cast(map:get(EventStream.context, > 'level'), 'string'), 'int') != 0] > select cast(-1, 'long') as id, cast(1, 'long') as alarm_status, > convert(cast(map:get(context, 'level'), 'string'), 'int') as severity, > event_type as alarm_key, event_source as alarmed_resource, cast(1, 'long') as > occurrence, eventTimestamp() as source_time > insert into AlarmsToCreate; > > @info(name='create_alarms') > from AlarmsToCreate left outer join NextAlarmId > select next_id as id, alarm_status, severity, alarm_key, alarmed_resource, > occurrence, source_time > insert into AlarmTable; > > --------------------------------------------------------------- > --------------------------------------------------------------- > -- Filter NON-INFO Events & load associated Alarms from the DB if exists !! > > -- Severity/Level has not changed AND this is not an INFO event ! > -- IF SO, we should have an active Alarm in the DB, and should increment its > "occurence" > @info(name = 'select_non_info_events') > from EventStream[ cast(map:get(context, 'level_changed'), 'string') != 'true' > and convert(cast(map:get(EventStream.context, 'level'), 'string'), 'int') != > 0] > select * > insert into TmpNonInfoEventStream; > > @info(name = 'load_active_alarms_for_non_info_events') > from TmpNonInfoEventStream left outer join AlarmTable[alarm_status==1] > on (AlarmTable.alarm_status == 1 and AlarmTable.alarm_key == event_type > and AlarmTable.alarmed_resource == event_source) > select * > insert into TmpNonInfoEventAlarmStream; > > @info(name = 'select_alarms_to_increment') > from TmpNonInfoEventAlarmStream[(not (id is null)) and severity == > convert(cast(map:get(EventStream.context, 'level'), 'string'), 'int')] > select AlarmTable.id, AlarmTable.alarm_status, AlarmTable.severity, > AlarmTable.alarm_key, AlarmTable.alarmed_resource, AlarmTable.occurrence, > AlarmTable.source_time > insert into AlarmsToIncrement; > > > @info(name = 'increment_alarms') > from AlarmsToIncrement > select * > update AlarmTable > set AlarmTable.occurrence = AlarmTable.occurrence+1 > on AlarmTable.id == id; > > *Also please see the logs below as well:* > Log related to *successfull* increment of the alarm count. I can see that > this updates the occurrence count of the specified Alarm in the DB. > > 2018-04-04 17:22:40,599 INFO > [org.wso2.siddhi.core.stream.output.sink.LogSink] > - mond : *AlarmsToIncrement* : Event{timestamp=1522851760576, data=[139, > 1, 1, thresholdEvent, region1.router1.cpu_utilization, 3, 1522850726953], > isExpired=false} > 2018-04-04 17:22:40,600 INFO > [org.wso2.siddhi.core.stream.output.sink.LogSink] > - mond : TmpNonInfoEventAlarmStream : Event{timestamp=1522851760576, > data=[null, thresholdEvent, threshold_rearmed, > region1.router1.cpu_utilization, > TODO, {path=region1.router1.cpu_utilization, aggr_context={average=12584.705, > current=12669.98, percentile_50=12584.705, percentile_95=12669.98, > percentile_100=12669.98, percentile_99=12669.98}, level_changed=false, > metric_context={node_profile=null, node=null, metric_name=null, > city=null, description=description, location=null, region=null}, > threshold_context={selectivity=1, metric_name=cpu_utilization, > critical=15000.0, warning_action=MAIL_DEVOPS, warning=8000.0, > aggr_field=rate, id=1, aggr_function=average, critical_action=MAIL_MANAGER, > path_regex=.*cpu_utilization}, old_level=1, rate=12669.98, level=1, > delta=3800994.0, value=4.22006964884E11, tenant=null, > timestamp=1521697800}, 139, 1, 1, thresholdEvent, > region1.router1.cpu_utilization, > 3, 1522850726953], isExpired=false} > > *HERE IS THE PROBLEM. I can see the AlarmsToClear, that exact Alarm's > status must be set to 0. BUT, it is not changed in the DB! Also below, I > can see the AlarmsToCreate log, and that item is successfully created in > the DB..* > > [org.wso2.extension.siddhi.store.rdbms.util.RDBMSTableUtils] : Closed > PreparedStatement Ignored FQCN: org.apache.commons.logging. > impl.SLF4JLocationAwareLog > [com.zaxxer.hikari.pool.PoolElf] : HikariPool-2 - Reset (nothing) on > connection org.postgresql.jdbc.PgConnection@202d9236 > [org.wso2.extension.siddhi.store.rdbms.util.RDBMSTableUtils] : Closed > Connection Ignored FQCN: org.apache.commons.logging. > impl.SLF4JLocationAwareLog > 2018-04-04 17:23:43,031 INFO > [org.wso2.siddhi.core.stream.output.sink.LogSink] > - mond : *AlarmsToClear* : Event{timestamp=1522851823026, data=[139, 1, > 1, thresholdEvent, region1.router1.cpu_utilization, 4, 1522850726953], > isExpired=false} > [org.wso2.extension.siddhi.store.rdbms.util.RDBMSTableUtils] : Closed > ResultSet Ignored FQCN: org.apache.commons.logging. > impl.SLF4JLocationAwareLog > [org.wso2.extension.siddhi.store.rdbms.util.RDBMSTableUtils] : Closed > PreparedStatement Ignored FQCN: org.apache.commons.logging. > impl.SLF4JLocationAwareLog > [com.zaxxer.hikari.pool.PoolElf] : HikariPool-2 - Reset (nothing) on > connection org.postgresql.jdbc.PgConnection@202d9236 > [org.wso2.extension.siddhi.store.rdbms.util.RDBMSTableUtils] : Closed > Connection Ignored FQCN: org.apache.commons.logging. > impl.SLF4JLocationAwareLog > [org.wso2.extension.siddhi.store.rdbms.RDBMSEventTable] : Ignore the > condition resolver in 'find()' method for compile condition: '?' Ignored > FQCN: org.apache.commons.logging.impl.SLF4JLocationAwareLog > [org.wso2.extension.siddhi.store.rdbms.util.RDBMSTableUtils] : Closed > ResultSet Ignored FQCN: org.apache.commons.logging. > impl.SLF4JLocationAwareLog > [org.wso2.extension.siddhi.store.rdbms.util.RDBMSTableUtils] : Closed > PreparedStatement Ignored FQCN: org.apache.commons.logging. > impl.SLF4JLocationAwareLog > [com.zaxxer.hikari.pool.PoolElf] : HikariPool-3 - Reset (nothing) on > connection org.postgresql.jdbc.PgConnection@432af457 > [org.wso2.extension.siddhi.store.rdbms.util.RDBMSTableUtils] : Closed > Connection Ignored FQCN: org.apache.commons.logging. > impl.SLF4JLocationAwareLog > [org.wso2.extension.siddhi.store.rdbms.util.RDBMSTableUtils] : Closed > PreparedStatement Ignored FQCN: org.apache.commons.logging. > impl.SLF4JLocationAwareLog > [com.zaxxer.hikari.pool.PoolElf] : HikariPool-2 - Reset (autoCommit) on > connection org.postgresql.jdbc.PgConnection@202d9236 > [org.wso2.extension.siddhi.store.rdbms.util.RDBMSTableUtils] : Closed > Connection Ignored FQCN: org.apache.commons.logging. > impl.SLF4JLocationAwareLog > [org.wso2.extension.siddhi.store.rdbms.RDBMSEventTable] : Ignore the > condition resolver in 'find()' method for compile condition: '?' Ignored > FQCN: org.apache.commons.logging.impl.SLF4JLocationAwareLog > [org.wso2.extension.siddhi.store.rdbms.util.RDBMSTableUtils] : Closed > ResultSet Ignored FQCN: org.apache.commons.logging. > impl.SLF4JLocationAwareLog > [org.wso2.extension.siddhi.store.rdbms.util.RDBMSTableUtils] : Closed > PreparedStatement Ignored FQCN: org.apache.commons.logging. > impl.SLF4JLocationAwareLog > [com.zaxxer.hikari.pool.PoolElf] : HikariPool-3 - Reset (nothing) on > connection org.postgresql.jdbc.PgConnection@432af457 > [org.wso2.extension.siddhi.store.rdbms.util.RDBMSTableUtils] : Closed > Connection Ignored FQCN: org.apache.commons.logging. > impl.SLF4JLocationAwareLog > 2018-04-04 17:23:43,039 INFO > [org.wso2.siddhi.core.stream.output.sink.LogSink] > - mond : *AlarmsToCreate* : Event{timestamp=1522851823026, data=[-1, 1, > 2, thresholdEvent, region1.router1.cpu_utilization, 1, 1522851823026], > isExpired=false} > 2018-04-04 17:23:43,040 INFO > [org.wso2.siddhi.core.stream.output.sink.LogSink] > - mond : EventStream : Event{timestamp=1522851823026, data=[null, > thresholdEvent, threshold_rearmed, region1.router1.cpu_utilization, TODO, > {path=region1.router1.cpu_utilization, aggr_context={average=12584.705, > current=12584.705, percentile_50=12584.705, percentile_95=12669.98, > percentile_100=12669.98, percentile_99=12669.98}, level_changed=true, > metric_context={node_profile=null, node=null, metric_name=null, > city=null, description=description, location=null, region=null}, > threshold_context={selectivity=1, metric_name=cpu_utilization, > critical=11000.0, warning_action=MAIL_DEVOPS, warning=8000.0, > aggr_field=rate, id=1, aggr_function=average, critical_action=MAIL_MANAGER, > path_regex=.*cpu_utilization}, old_level=1, rate=12584.705, level=2, > delta=-7550823.0, value=4.21999414061E11, tenant=null, > timestamp=1521697200}], isExpired=false} > [org.wso2.extension.siddhi.io.http.source.HttpWorkerThread] : Submitted > Event {"event": {"path": "region1.router1.cpu_utilization", > "description": "description", "value": 421999414061.0, "timestamp": > 1521697200}} Stream > [org.wso2.transport.http.netty.listener.HTTPServerChannelInitializer] : > Initializing source channel pipeline > > > > > _______________________________________________ > Dev mailing list > Dev@wso2.org > http://wso2.org/cgi-bin/mailman/listinfo/dev > > > > > _______________________________________________ > Dev mailing list > Dev@wso2.org > http://wso2.org/cgi-bin/mailman/listinfo/dev > > -- W.G. Gihan Anuruddha Associate Technical Lead | WSO2, Inc.
_______________________________________________ Dev mailing list Dev@wso2.org http://wso2.org/cgi-bin/mailman/listinfo/dev