Hi All,

Please find below a sample siddhi file, and the related exception. Any comments 
are highly appreciated.

First the exception:

Caused by: java.sql.BatchUpdateException: Batch entry 0 UPDATE my_test_table 
SET occurrence = (my_test_table.occurrence + '_str' ) WHERE (my_test_table.id = 
2 ) was aborted: ERROR: operator does not exist: bigint + character varying
  Hint: No operator matches the given name and argument type(s). You might need 
to add explicit type casts.
  Position: 65  Call getNextException to see other errors in the batch.
        at 
org.postgresql.jdbc.BatchResultHandler.handleCompletion(BatchResultHandler.java:166)
        at 
org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:490)
        at org.postgresql.jdbc.PgStatement.executeBatch(PgStatement.java:835)
        at 
org.postgresql.jdbc.PgPreparedStatement.executeBatch(PgPreparedStatement.java:1556)
        at 
com.zaxxer.hikari.proxy.StatementProxy.executeBatch(StatementProxy.java:127)
        at 
com.zaxxer.hikari.proxy.HikariPreparedStatementProxy.executeBatch(HikariPreparedStatementProxy.java)
        at 
org.wso2.extension.siddhi.store.rdbms.RDBMSEventTable.batchProcessSQLUpdates(RDBMSEventTable.java:558)



Siddhi file to re-produce the exception;

———

@App:name("Test App")
@App:description("App_Description")
-- Please refer to http://wso2.github.io/siddhi/documentation/siddhi-4.0/

-- insert into my_test_table (id, occurrence, some_key) values(1, 1, 'o');
-- insert into my_test_table (id, occurrence, some_key) values(2, 1000, 'nok');

define trigger TriggerStream at every 10 sec;

@Store(type = "rdbms", jdbc.url = "jdbc:postgresql://localhost:5432/my_db", 
username = "my_user", password = "my_password" , jdbc.driver.name = 
"org.postgresql.Driver", table.name = 'my_test_table')
@primaryKey('id')
define table TestTable(id long, occurrence long, some_key string);

@sink(type='log')
define stream UpdateNumeric(triggered_time long, id long, occurrence long, 
some_key string);

@sink(type='log')
define stream UpdateString(triggered_time long, id long, occurrence long, 
some_key string);

from TriggerStream left outer join TestTable
    on TestTable.id==1
select *
insert into UpdateNumeric;

from TriggerStream left outer join TestTable
    on TestTable.id==2
select *
insert into UpdateString;

from UpdateNumeric
select *
update TestTable
 set TestTable.occurrence = TestTable.occurrence + 1
        on TestTable.id == id;

from UpdateString
select *
update TestTable
 set TestTable.some_key = TestTable.some_key + "_str"
        on TestTable.id == id;

———



Best regards,

Bahtiyar.

> 
>> On 4 Apr 2018, at 17:27, Bahtiyar KARANLIK <bahti...@karanlik.org 
>> <mailto:bahti...@karanlik.org>> wrote:
>> 
>> Hi All,
>> 
>> I'm working on an "alarm management" test app. The idea is as follows:
>> 
>> - I've an EventStream with following important fields:
>> -- event_type = type of the event. i.e. threshold_exceeded_event
>> -- event_source = the entity that is going to be monitored. i.e. 
>> region1.router1.cpu_utilization
>> -- level = severity of the Event (i.e. 0 -> Info, 1-> warning, 2 -> critical)
>> -- level_changed = boolean flag (whether the level has changed from the 
>> previous event item. i.e. if level is info at t0, and warning at t1, then t1 
>> event has this level_changed flag set to true)
>> 
>> Then my algorithm comes in:
>> 
>> 1) if the level_changed flag is false and if it is not Info;
>> --> Then I should have an "Active" (status == 1) alarm in the DB, and should 
>> update its occurence count.
>> 
>> 2) if the level_changed flag is true;
>> 2.a) I'm going to "clear" the alarm in the DB table (keywords: RDBMS Store 
>> Extension + PostgreSQL)
>> 2.b) check 'level' field, and if it is not "Info" (==0), then Create a NEW 
>> alarm in the table with the current level (level = severity).
>> 
>> Here starts the problem;
>> 
>> for events with "level_changed" set to true, updating the active Alarms's 
>> status field is not executed at all!! I can create Alarms (AlarmsToCreate), 
>> Increment their Occurence count (AlarmsToIncrement), but can not set their 
>> status fields (AlarmsToClear). 
>> 
>> I guess, there is a problem with multiple DB hits (i.e. clear the Alarm-set 
>> status to 0 for alarm with id 3 and create a new Alarm) for the same event 
>> (within a stream). I can see that the logic can decide that it needs to 
>> Clear an alarm, and create a new one (logs with AlarmsToClear and 
>> AlarmsToCreate are there !. Please check below logs for further info).
>> 
>> Any comments, guidance, is highly appreciated, since I'm stuck with this 
>> issue for two weekends up to now :-( Either, there is something that I'm 
>> completely missing related to Stream processing and DB hits or there is a 
>> bug somewhere over there (probably with store-rdbms extension's 
>> update-sql-compile logics..) 
>> 
>> Best regards,
>> 
>> Please find below related part from my siddhi file:
>> 
>> ---------------------------------------------------------------
>> -- SELECT LEVEL CHANGED EVENTS
>> 
>> @info(name = 'select_level_changed_events')
>> -- from EventStream[cast(map:get(EventStream.context, 'level_changed'), 
>> 'string') == 'true' and cast(map:get(EventStream.context, 'old_level'), 
>> 'string') != '0']
>> from EventStream[cast(map:get(context, 'level_changed'), 'string') == 'true']
>> --select EventStream.tenant, EventStream.event_type, EventStream.event_uei, 
>> EventStream.event_source, EventStream.proposed_action, EventStream.context 
>> as event_context
>> select *
>> insert into TempLevelChangedEvents;
>> 
>> ---------------------------------------------------------------
>> -- CLEAR ALARMS
>> @info(name='select_alarms_to_clear')
>> from TempLevelChangedEvents left outer join AlarmTable[alarm_status==1]
>>    on (AlarmTable.alarm_status == 1 and AlarmTable.alarm_key == event_type 
>> and AlarmTable.alarmed_resource == event_source)
>> select AlarmTable.id, AlarmTable.alarm_status, AlarmTable.severity, 
>> AlarmTable.alarm_key, AlarmTable.alarmed_resource, AlarmTable.occurrence,  
>> AlarmTable.source_time
>> insert into AlarmsToClear;
>> 
>> @info(name = 'clear_alarms')
>> from AlarmsToClear
>> select *
>> update AlarmTable
>>  set AlarmTable.occurrence = AlarmTable.occurrence, AlarmTable.alarm_status 
>> = 0
>>    on AlarmTable.id == id;
>> 
>> ---------------------------------------------------------------
>> ---------------------------------------------------------------
>> -- CREATE ALARMS
>> 
>> -- If level has changed to a NON-INFO severity/level, then we should create 
>> a new Alarm !
>> @info(name = 'select_events_to_create_alarms')
>> from TempLevelChangedEvents[convert(cast(map:get(EventStream.context, 
>> 'level'), 'string'), 'int') != 0]
>> select cast(-1, 'long') as id, cast(1, 'long') as alarm_status, 
>> convert(cast(map:get(context, 'level'), 'string'), 'int') as severity, 
>> event_type as alarm_key, event_source as alarmed_resource, cast(1, 'long') 
>> as occurrence, eventTimestamp() as source_time
>> insert into AlarmsToCreate;
>> 
>> @info(name='create_alarms')
>> from AlarmsToCreate left outer join NextAlarmId
>> select next_id as id, alarm_status, severity, alarm_key, alarmed_resource, 
>> occurrence, source_time
>> insert into AlarmTable;
>> 
>> ---------------------------------------------------------------
>> ---------------------------------------------------------------
>> -- Filter NON-INFO Events & load associated Alarms from the DB if exists !!
>> 
>> -- Severity/Level has not changed AND this is not an INFO event !
>> -- IF SO, we should have an active Alarm in the DB, and should increment its 
>> "occurence"
>> @info(name = 'select_non_info_events')
>> from EventStream[ cast(map:get(context, 'level_changed'), 'string') != 
>> 'true' and convert(cast(map:get(EventStream.context, 'level'), 'string'), 
>> 'int') != 0]
>> select *
>> insert into TmpNonInfoEventStream;
>> 
>> @info(name = 'load_active_alarms_for_non_info_events')
>> from TmpNonInfoEventStream left outer join AlarmTable[alarm_status==1]
>>    on (AlarmTable.alarm_status == 1 and AlarmTable.alarm_key == event_type 
>> and AlarmTable.alarmed_resource == event_source)
>> select *
>> insert into TmpNonInfoEventAlarmStream;
>> 
>> @info(name = 'select_alarms_to_increment')
>> from TmpNonInfoEventAlarmStream[(not (id is null)) and severity == 
>> convert(cast(map:get(EventStream.context, 'level'), 'string'), 'int')]
>> select AlarmTable.id, AlarmTable.alarm_status, AlarmTable.severity, 
>> AlarmTable.alarm_key, AlarmTable.alarmed_resource, AlarmTable.occurrence,  
>> AlarmTable.source_time
>> insert into AlarmsToIncrement;
>> 
>> 
>> @info(name = 'increment_alarms')
>> from AlarmsToIncrement
>> select *
>> update AlarmTable
>>  set AlarmTable.occurrence = AlarmTable.occurrence+1
>>    on AlarmTable.id == id;
>> Also please see the logs below as well:
>> Log related to successfull increment of the alarm count. I can see that this 
>> updates the occurrence count of the specified Alarm in the DB.
>> 
>> 2018-04-04 17:22:40,599 INFO  
>> [org.wso2.siddhi.core.stream.output.sink.LogSink] - mond : AlarmsToIncrement 
>> : Event{timestamp=1522851760576, data=[139, 1, 1, thresholdEvent, 
>> region1.router1.cpu_utilization, 3, 1522850726953], isExpired=false}
>> 2018-04-04 17:22:40,600 INFO  
>> [org.wso2.siddhi.core.stream.output.sink.LogSink] - mond : 
>> TmpNonInfoEventAlarmStream : Event{timestamp=1522851760576, data=[null, 
>> thresholdEvent, threshold_rearmed, region1.router1.cpu_utilization, TODO, 
>> {path=region1.router1.cpu_utilization, aggr_context={average=12584.705, 
>> current=12669.98, percentile_50=12584.705, percentile_95=12669.98, 
>> percentile_100=12669.98, percentile_99=12669.98}, level_changed=false, 
>> metric_context={node_profile=null, node=null, metric_name=null, city=null, 
>> description=description, location=null, region=null}, 
>> threshold_context={selectivity=1, metric_name=cpu_utilization, 
>> critical=15000.0, warning_action=MAIL_DEVOPS, warning=8000.0, 
>> aggr_field=rate, id=1, aggr_function=average, critical_action=MAIL_MANAGER, 
>> path_regex=.*cpu_utilization}, old_level=1, rate=12669.98, level=1, 
>> delta=3800994.0, value=4.22006964884E11, tenant=null, timestamp=1521697800}, 
>> 139, 1, 1, thresholdEvent, region1.router1.cpu_utilization, 3, 
>> 1522850726953], isExpired=false}
>> 
>> HERE IS THE PROBLEM. I can see the AlarmsToClear, that exact Alarm's status 
>> must be set to 0. BUT, it is not changed in the DB! Also below, I can see 
>> the AlarmsToCreate log, and that item is successfully created in the DB..
>> 
>> [org.wso2.extension.siddhi.store.rdbms.util.RDBMSTableUtils] : Closed 
>> PreparedStatement Ignored FQCN: 
>> org.apache.commons.logging.impl.SLF4JLocationAwareLog
>> [com.zaxxer.hikari.pool.PoolElf] : HikariPool-2 - Reset (nothing) on 
>> connection org.postgresql.jdbc.PgConnection@202d9236
>> [org.wso2.extension.siddhi.store.rdbms.util.RDBMSTableUtils] : Closed 
>> Connection Ignored FQCN: 
>> org.apache.commons.logging.impl.SLF4JLocationAwareLog
>> 2018-04-04 17:23:43,031 INFO  
>> [org.wso2.siddhi.core.stream.output.sink.LogSink] - mond : AlarmsToClear : 
>> Event{timestamp=1522851823026, data=[139, 1, 1, thresholdEvent, 
>> region1.router1.cpu_utilization, 4, 1522850726953], isExpired=false}
>> [org.wso2.extension.siddhi.store.rdbms.util.RDBMSTableUtils] : Closed 
>> ResultSet Ignored FQCN: org.apache.commons.logging.impl.SLF4JLocationAwareLog
>> [org.wso2.extension.siddhi.store.rdbms.util.RDBMSTableUtils] : Closed 
>> PreparedStatement Ignored FQCN: 
>> org.apache.commons.logging.impl.SLF4JLocationAwareLog
>> [com.zaxxer.hikari.pool.PoolElf] : HikariPool-2 - Reset (nothing) on 
>> connection org.postgresql.jdbc.PgConnection@202d9236
>> [org.wso2.extension.siddhi.store.rdbms.util.RDBMSTableUtils] : Closed 
>> Connection Ignored FQCN: 
>> org.apache.commons.logging.impl.SLF4JLocationAwareLog
>> [org.wso2.extension.siddhi.store.rdbms.RDBMSEventTable] : Ignore the 
>> condition resolver in 'find()' method for compile condition: '?' Ignored 
>> FQCN: org.apache.commons.logging.impl.SLF4JLocationAwareLog
>> [org.wso2.extension.siddhi.store.rdbms.util.RDBMSTableUtils] : Closed 
>> ResultSet Ignored FQCN: org.apache.commons.logging.impl.SLF4JLocationAwareLog
>> [org.wso2.extension.siddhi.store.rdbms.util.RDBMSTableUtils] : Closed 
>> PreparedStatement Ignored FQCN: 
>> org.apache.commons.logging.impl.SLF4JLocationAwareLog
>> [com.zaxxer.hikari.pool.PoolElf] : HikariPool-3 - Reset (nothing) on 
>> connection org.postgresql.jdbc.PgConnection@432af457
>> [org.wso2.extension.siddhi.store.rdbms.util.RDBMSTableUtils] : Closed 
>> Connection Ignored FQCN: 
>> org.apache.commons.logging.impl.SLF4JLocationAwareLog
>> [org.wso2.extension.siddhi.store.rdbms.util.RDBMSTableUtils] : Closed 
>> PreparedStatement Ignored FQCN: 
>> org.apache.commons.logging.impl.SLF4JLocationAwareLog
>> [com.zaxxer.hikari.pool.PoolElf] : HikariPool-2 - Reset (autoCommit) on 
>> connection org.postgresql.jdbc.PgConnection@202d9236
>> [org.wso2.extension.siddhi.store.rdbms.util.RDBMSTableUtils] : Closed 
>> Connection Ignored FQCN: 
>> org.apache.commons.logging.impl.SLF4JLocationAwareLog
>> [org.wso2.extension.siddhi.store.rdbms.RDBMSEventTable] : Ignore the 
>> condition resolver in 'find()' method for compile condition: '?' Ignored 
>> FQCN: org.apache.commons.logging.impl.SLF4JLocationAwareLog
>> [org.wso2.extension.siddhi.store.rdbms.util.RDBMSTableUtils] : Closed 
>> ResultSet Ignored FQCN: org.apache.commons.logging.impl.SLF4JLocationAwareLog
>> [org.wso2.extension.siddhi.store.rdbms.util.RDBMSTableUtils] : Closed 
>> PreparedStatement Ignored FQCN: 
>> org.apache.commons.logging.impl.SLF4JLocationAwareLog
>> [com.zaxxer.hikari.pool.PoolElf] : HikariPool-3 - Reset (nothing) on 
>> connection org.postgresql.jdbc.PgConnection@432af457
>> [org.wso2.extension.siddhi.store.rdbms.util.RDBMSTableUtils] : Closed 
>> Connection Ignored FQCN: 
>> org.apache.commons.logging.impl.SLF4JLocationAwareLog
>> 2018-04-04 17:23:43,039 INFO  
>> [org.wso2.siddhi.core.stream.output.sink.LogSink] - mond : AlarmsToCreate : 
>> Event{timestamp=1522851823026, data=[-1, 1, 2, thresholdEvent, 
>> region1.router1.cpu_utilization, 1, 1522851823026], isExpired=false}
>> 2018-04-04 17:23:43,040 INFO  
>> [org.wso2.siddhi.core.stream.output.sink.LogSink] - mond : EventStream : 
>> Event{timestamp=1522851823026, data=[null, thresholdEvent, 
>> threshold_rearmed, region1.router1.cpu_utilization, TODO, 
>> {path=region1.router1.cpu_utilization, aggr_context={average=12584.705, 
>> current=12584.705, percentile_50=12584.705, percentile_95=12669.98, 
>> percentile_100=12669.98, percentile_99=12669.98}, level_changed=true, 
>> metric_context={node_profile=null, node=null, metric_name=null, city=null, 
>> description=description, location=null, region=null}, 
>> threshold_context={selectivity=1, metric_name=cpu_utilization, 
>> critical=11000.0, warning_action=MAIL_DEVOPS, warning=8000.0, 
>> aggr_field=rate, id=1, aggr_function=average, critical_action=MAIL_MANAGER, 
>> path_regex=.*cpu_utilization}, old_level=1, rate=12584.705, level=2, 
>> delta=-7550823.0, value=4.21999414061E11, tenant=null, 
>> timestamp=1521697200}], isExpired=false}
>> [org.wso2.extension.siddhi.io.http.source.HttpWorkerThread] : Submitted 
>> Event {"event": {"path": "region1.router1.cpu_utilization", "description": 
>> "description", "value": 421999414061.0, "timestamp": 1521697200}} Stream
>> [org.wso2.transport.http.netty.listener.HTTPServerChannelInitializer] : 
>> Initializing source channel pipeline
>> 
>> 
>> 
>> 
>> _______________________________________________
>> Dev mailing list
>> Dev@wso2.org <mailto:Dev@wso2.org>
>> http://wso2.org/cgi-bin/mailman/listinfo/dev
> 

_______________________________________________
Dev mailing list
Dev@wso2.org
http://wso2.org/cgi-bin/mailman/listinfo/dev

Reply via email to