Hi All,
Please find below a sample siddhi file, and the related exception. Any comments
are highly appreciated.
First the exception:
Caused by: java.sql.BatchUpdateException: Batch entry 0 UPDATE my_test_table
SET occurrence = (my_test_table.occurrence + '_str' ) WHERE (my_test_table.id =
2 ) was aborted: ERROR: operator does not exist: bigint + character varying
Hint: No operator matches the given name and argument type(s). You might need
to add explicit type casts.
Position: 65 Call getNextException to see other errors in the batch.
at
org.postgresql.jdbc.BatchResultHandler.handleCompletion(BatchResultHandler.java:166)
at
org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:490)
at org.postgresql.jdbc.PgStatement.executeBatch(PgStatement.java:835)
at
org.postgresql.jdbc.PgPreparedStatement.executeBatch(PgPreparedStatement.java:1556)
at
com.zaxxer.hikari.proxy.StatementProxy.executeBatch(StatementProxy.java:127)
at
com.zaxxer.hikari.proxy.HikariPreparedStatementProxy.executeBatch(HikariPreparedStatementProxy.java)
at
org.wso2.extension.siddhi.store.rdbms.RDBMSEventTable.batchProcessSQLUpdates(RDBMSEventTable.java:558)
Siddhi file to re-produce the exception;
———
@App:name("Test App")
@App:description("App_Description")
-- Please refer to http://wso2.github.io/siddhi/documentation/siddhi-4.0/
-- insert into my_test_table (id, occurrence, some_key) values(1, 1, 'o');
-- insert into my_test_table (id, occurrence, some_key) values(2, 1000, 'nok');
define trigger TriggerStream at every 10 sec;
@Store(type = "rdbms", jdbc.url = "jdbc:postgresql://localhost:5432/my_db",
username = "my_user", password = "my_password" , jdbc.driver.name =
"org.postgresql.Driver", table.name = 'my_test_table')
@primaryKey('id')
define table TestTable(id long, occurrence long, some_key string);
@sink(type='log')
define stream UpdateNumeric(triggered_time long, id long, occurrence long,
some_key string);
@sink(type='log')
define stream UpdateString(triggered_time long, id long, occurrence long,
some_key string);
from TriggerStream left outer join TestTable
on TestTable.id==1
select *
insert into UpdateNumeric;
from TriggerStream left outer join TestTable
on TestTable.id==2
select *
insert into UpdateString;
from UpdateNumeric
select *
update TestTable
set TestTable.occurrence = TestTable.occurrence + 1
on TestTable.id == id;
from UpdateString
select *
update TestTable
set TestTable.some_key = TestTable.some_key + "_str"
on TestTable.id == id;
———
Best regards,
Bahtiyar.
>
>> On 4 Apr 2018, at 17:27, Bahtiyar KARANLIK <[email protected]
>> <mailto:[email protected]>> wrote:
>>
>> Hi All,
>>
>> I'm working on an "alarm management" test app. The idea is as follows:
>>
>> - I've an EventStream with following important fields:
>> -- event_type = type of the event. i.e. threshold_exceeded_event
>> -- event_source = the entity that is going to be monitored. i.e.
>> region1.router1.cpu_utilization
>> -- level = severity of the Event (i.e. 0 -> Info, 1-> warning, 2 -> critical)
>> -- level_changed = boolean flag (whether the level has changed from the
>> previous event item. i.e. if level is info at t0, and warning at t1, then t1
>> event has this level_changed flag set to true)
>>
>> Then my algorithm comes in:
>>
>> 1) if the level_changed flag is false and if it is not Info;
>> --> Then I should have an "Active" (status == 1) alarm in the DB, and should
>> update its occurence count.
>>
>> 2) if the level_changed flag is true;
>> 2.a) I'm going to "clear" the alarm in the DB table (keywords: RDBMS Store
>> Extension + PostgreSQL)
>> 2.b) check 'level' field, and if it is not "Info" (==0), then Create a NEW
>> alarm in the table with the current level (level = severity).
>>
>> Here starts the problem;
>>
>> for events with "level_changed" set to true, updating the active Alarms's
>> status field is not executed at all!! I can create Alarms (AlarmsToCreate),
>> Increment their Occurence count (AlarmsToIncrement), but can not set their
>> status fields (AlarmsToClear).
>>
>> I guess, there is a problem with multiple DB hits (i.e. clear the Alarm-set
>> status to 0 for alarm with id 3 and create a new Alarm) for the same event
>> (within a stream). I can see that the logic can decide that it needs to
>> Clear an alarm, and create a new one (logs with AlarmsToClear and
>> AlarmsToCreate are there !. Please check below logs for further info).
>>
>> Any comments, guidance, is highly appreciated, since I'm stuck with this
>> issue for two weekends up to now :-( Either, there is something that I'm
>> completely missing related to Stream processing and DB hits or there is a
>> bug somewhere over there (probably with store-rdbms extension's
>> update-sql-compile logics..)
>>
>> Best regards,
>>
>> Please find below related part from my siddhi file:
>>
>> ---------------------------------------------------------------
>> -- SELECT LEVEL CHANGED EVENTS
>>
>> @info(name = 'select_level_changed_events')
>> -- from EventStream[cast(map:get(EventStream.context, 'level_changed'),
>> 'string') == 'true' and cast(map:get(EventStream.context, 'old_level'),
>> 'string') != '0']
>> from EventStream[cast(map:get(context, 'level_changed'), 'string') == 'true']
>> --select EventStream.tenant, EventStream.event_type, EventStream.event_uei,
>> EventStream.event_source, EventStream.proposed_action, EventStream.context
>> as event_context
>> select *
>> insert into TempLevelChangedEvents;
>>
>> ---------------------------------------------------------------
>> -- CLEAR ALARMS
>> @info(name='select_alarms_to_clear')
>> from TempLevelChangedEvents left outer join AlarmTable[alarm_status==1]
>> on (AlarmTable.alarm_status == 1 and AlarmTable.alarm_key == event_type
>> and AlarmTable.alarmed_resource == event_source)
>> select AlarmTable.id, AlarmTable.alarm_status, AlarmTable.severity,
>> AlarmTable.alarm_key, AlarmTable.alarmed_resource, AlarmTable.occurrence,
>> AlarmTable.source_time
>> insert into AlarmsToClear;
>>
>> @info(name = 'clear_alarms')
>> from AlarmsToClear
>> select *
>> update AlarmTable
>> set AlarmTable.occurrence = AlarmTable.occurrence, AlarmTable.alarm_status
>> = 0
>> on AlarmTable.id == id;
>>
>> ---------------------------------------------------------------
>> ---------------------------------------------------------------
>> -- CREATE ALARMS
>>
>> -- If level has changed to a NON-INFO severity/level, then we should create
>> a new Alarm !
>> @info(name = 'select_events_to_create_alarms')
>> from TempLevelChangedEvents[convert(cast(map:get(EventStream.context,
>> 'level'), 'string'), 'int') != 0]
>> select cast(-1, 'long') as id, cast(1, 'long') as alarm_status,
>> convert(cast(map:get(context, 'level'), 'string'), 'int') as severity,
>> event_type as alarm_key, event_source as alarmed_resource, cast(1, 'long')
>> as occurrence, eventTimestamp() as source_time
>> insert into AlarmsToCreate;
>>
>> @info(name='create_alarms')
>> from AlarmsToCreate left outer join NextAlarmId
>> select next_id as id, alarm_status, severity, alarm_key, alarmed_resource,
>> occurrence, source_time
>> insert into AlarmTable;
>>
>> ---------------------------------------------------------------
>> ---------------------------------------------------------------
>> -- Filter NON-INFO Events & load associated Alarms from the DB if exists !!
>>
>> -- Severity/Level has not changed AND this is not an INFO event !
>> -- IF SO, we should have an active Alarm in the DB, and should increment its
>> "occurence"
>> @info(name = 'select_non_info_events')
>> from EventStream[ cast(map:get(context, 'level_changed'), 'string') !=
>> 'true' and convert(cast(map:get(EventStream.context, 'level'), 'string'),
>> 'int') != 0]
>> select *
>> insert into TmpNonInfoEventStream;
>>
>> @info(name = 'load_active_alarms_for_non_info_events')
>> from TmpNonInfoEventStream left outer join AlarmTable[alarm_status==1]
>> on (AlarmTable.alarm_status == 1 and AlarmTable.alarm_key == event_type
>> and AlarmTable.alarmed_resource == event_source)
>> select *
>> insert into TmpNonInfoEventAlarmStream;
>>
>> @info(name = 'select_alarms_to_increment')
>> from TmpNonInfoEventAlarmStream[(not (id is null)) and severity ==
>> convert(cast(map:get(EventStream.context, 'level'), 'string'), 'int')]
>> select AlarmTable.id, AlarmTable.alarm_status, AlarmTable.severity,
>> AlarmTable.alarm_key, AlarmTable.alarmed_resource, AlarmTable.occurrence,
>> AlarmTable.source_time
>> insert into AlarmsToIncrement;
>>
>>
>> @info(name = 'increment_alarms')
>> from AlarmsToIncrement
>> select *
>> update AlarmTable
>> set AlarmTable.occurrence = AlarmTable.occurrence+1
>> on AlarmTable.id == id;
>> Also please see the logs below as well:
>> Log related to successfull increment of the alarm count. I can see that this
>> updates the occurrence count of the specified Alarm in the DB.
>>
>> 2018-04-04 17:22:40,599 INFO
>> [org.wso2.siddhi.core.stream.output.sink.LogSink] - mond : AlarmsToIncrement
>> : Event{timestamp=1522851760576, data=[139, 1, 1, thresholdEvent,
>> region1.router1.cpu_utilization, 3, 1522850726953], isExpired=false}
>> 2018-04-04 17:22:40,600 INFO
>> [org.wso2.siddhi.core.stream.output.sink.LogSink] - mond :
>> TmpNonInfoEventAlarmStream : Event{timestamp=1522851760576, data=[null,
>> thresholdEvent, threshold_rearmed, region1.router1.cpu_utilization, TODO,
>> {path=region1.router1.cpu_utilization, aggr_context={average=12584.705,
>> current=12669.98, percentile_50=12584.705, percentile_95=12669.98,
>> percentile_100=12669.98, percentile_99=12669.98}, level_changed=false,
>> metric_context={node_profile=null, node=null, metric_name=null, city=null,
>> description=description, location=null, region=null},
>> threshold_context={selectivity=1, metric_name=cpu_utilization,
>> critical=15000.0, warning_action=MAIL_DEVOPS, warning=8000.0,
>> aggr_field=rate, id=1, aggr_function=average, critical_action=MAIL_MANAGER,
>> path_regex=.*cpu_utilization}, old_level=1, rate=12669.98, level=1,
>> delta=3800994.0, value=4.22006964884E11, tenant=null, timestamp=1521697800},
>> 139, 1, 1, thresholdEvent, region1.router1.cpu_utilization, 3,
>> 1522850726953], isExpired=false}
>>
>> HERE IS THE PROBLEM. I can see the AlarmsToClear, that exact Alarm's status
>> must be set to 0. BUT, it is not changed in the DB! Also below, I can see
>> the AlarmsToCreate log, and that item is successfully created in the DB..
>>
>> [org.wso2.extension.siddhi.store.rdbms.util.RDBMSTableUtils] : Closed
>> PreparedStatement Ignored FQCN:
>> org.apache.commons.logging.impl.SLF4JLocationAwareLog
>> [com.zaxxer.hikari.pool.PoolElf] : HikariPool-2 - Reset (nothing) on
>> connection org.postgresql.jdbc.PgConnection@202d9236
>> [org.wso2.extension.siddhi.store.rdbms.util.RDBMSTableUtils] : Closed
>> Connection Ignored FQCN:
>> org.apache.commons.logging.impl.SLF4JLocationAwareLog
>> 2018-04-04 17:23:43,031 INFO
>> [org.wso2.siddhi.core.stream.output.sink.LogSink] - mond : AlarmsToClear :
>> Event{timestamp=1522851823026, data=[139, 1, 1, thresholdEvent,
>> region1.router1.cpu_utilization, 4, 1522850726953], isExpired=false}
>> [org.wso2.extension.siddhi.store.rdbms.util.RDBMSTableUtils] : Closed
>> ResultSet Ignored FQCN: org.apache.commons.logging.impl.SLF4JLocationAwareLog
>> [org.wso2.extension.siddhi.store.rdbms.util.RDBMSTableUtils] : Closed
>> PreparedStatement Ignored FQCN:
>> org.apache.commons.logging.impl.SLF4JLocationAwareLog
>> [com.zaxxer.hikari.pool.PoolElf] : HikariPool-2 - Reset (nothing) on
>> connection org.postgresql.jdbc.PgConnection@202d9236
>> [org.wso2.extension.siddhi.store.rdbms.util.RDBMSTableUtils] : Closed
>> Connection Ignored FQCN:
>> org.apache.commons.logging.impl.SLF4JLocationAwareLog
>> [org.wso2.extension.siddhi.store.rdbms.RDBMSEventTable] : Ignore the
>> condition resolver in 'find()' method for compile condition: '?' Ignored
>> FQCN: org.apache.commons.logging.impl.SLF4JLocationAwareLog
>> [org.wso2.extension.siddhi.store.rdbms.util.RDBMSTableUtils] : Closed
>> ResultSet Ignored FQCN: org.apache.commons.logging.impl.SLF4JLocationAwareLog
>> [org.wso2.extension.siddhi.store.rdbms.util.RDBMSTableUtils] : Closed
>> PreparedStatement Ignored FQCN:
>> org.apache.commons.logging.impl.SLF4JLocationAwareLog
>> [com.zaxxer.hikari.pool.PoolElf] : HikariPool-3 - Reset (nothing) on
>> connection org.postgresql.jdbc.PgConnection@432af457
>> [org.wso2.extension.siddhi.store.rdbms.util.RDBMSTableUtils] : Closed
>> Connection Ignored FQCN:
>> org.apache.commons.logging.impl.SLF4JLocationAwareLog
>> [org.wso2.extension.siddhi.store.rdbms.util.RDBMSTableUtils] : Closed
>> PreparedStatement Ignored FQCN:
>> org.apache.commons.logging.impl.SLF4JLocationAwareLog
>> [com.zaxxer.hikari.pool.PoolElf] : HikariPool-2 - Reset (autoCommit) on
>> connection org.postgresql.jdbc.PgConnection@202d9236
>> [org.wso2.extension.siddhi.store.rdbms.util.RDBMSTableUtils] : Closed
>> Connection Ignored FQCN:
>> org.apache.commons.logging.impl.SLF4JLocationAwareLog
>> [org.wso2.extension.siddhi.store.rdbms.RDBMSEventTable] : Ignore the
>> condition resolver in 'find()' method for compile condition: '?' Ignored
>> FQCN: org.apache.commons.logging.impl.SLF4JLocationAwareLog
>> [org.wso2.extension.siddhi.store.rdbms.util.RDBMSTableUtils] : Closed
>> ResultSet Ignored FQCN: org.apache.commons.logging.impl.SLF4JLocationAwareLog
>> [org.wso2.extension.siddhi.store.rdbms.util.RDBMSTableUtils] : Closed
>> PreparedStatement Ignored FQCN:
>> org.apache.commons.logging.impl.SLF4JLocationAwareLog
>> [com.zaxxer.hikari.pool.PoolElf] : HikariPool-3 - Reset (nothing) on
>> connection org.postgresql.jdbc.PgConnection@432af457
>> [org.wso2.extension.siddhi.store.rdbms.util.RDBMSTableUtils] : Closed
>> Connection Ignored FQCN:
>> org.apache.commons.logging.impl.SLF4JLocationAwareLog
>> 2018-04-04 17:23:43,039 INFO
>> [org.wso2.siddhi.core.stream.output.sink.LogSink] - mond : AlarmsToCreate :
>> Event{timestamp=1522851823026, data=[-1, 1, 2, thresholdEvent,
>> region1.router1.cpu_utilization, 1, 1522851823026], isExpired=false}
>> 2018-04-04 17:23:43,040 INFO
>> [org.wso2.siddhi.core.stream.output.sink.LogSink] - mond : EventStream :
>> Event{timestamp=1522851823026, data=[null, thresholdEvent,
>> threshold_rearmed, region1.router1.cpu_utilization, TODO,
>> {path=region1.router1.cpu_utilization, aggr_context={average=12584.705,
>> current=12584.705, percentile_50=12584.705, percentile_95=12669.98,
>> percentile_100=12669.98, percentile_99=12669.98}, level_changed=true,
>> metric_context={node_profile=null, node=null, metric_name=null, city=null,
>> description=description, location=null, region=null},
>> threshold_context={selectivity=1, metric_name=cpu_utilization,
>> critical=11000.0, warning_action=MAIL_DEVOPS, warning=8000.0,
>> aggr_field=rate, id=1, aggr_function=average, critical_action=MAIL_MANAGER,
>> path_regex=.*cpu_utilization}, old_level=1, rate=12584.705, level=2,
>> delta=-7550823.0, value=4.21999414061E11, tenant=null,
>> timestamp=1521697200}], isExpired=false}
>> [org.wso2.extension.siddhi.io.http.source.HttpWorkerThread] : Submitted
>> Event {"event": {"path": "region1.router1.cpu_utilization", "description":
>> "description", "value": 421999414061.0, "timestamp": 1521697200}} Stream
>> [org.wso2.transport.http.netty.listener.HTTPServerChannelInitializer] :
>> Initializing source channel pipeline
>>
>>
>>
>>
>> _______________________________________________
>> Dev mailing list
>> [email protected] <mailto:[email protected]>
>> http://wso2.org/cgi-bin/mailman/listinfo/dev
>
_______________________________________________
Dev mailing list
[email protected]
http://wso2.org/cgi-bin/mailman/listinfo/dev