[+ Mohan, Tishan]

On Sun, Apr 8, 2018 at 9:21 PM, Bahtiyar KARANLIK <bahti...@karanlik.org>
wrote:

>
> Hi All,
>
> Please find below a sample siddhi file, and the related exception. Any
> comments are highly appreciated.
>
> First the exception:
>
> Caused by: java.sql.BatchUpdateException: Batch entry 0 UPDATE
> my_test_table SET occurrence = (my_test_table.occurrence + '_str' ) WHERE (
> my_test_table.id = 2 ) was aborted: ERROR: operator does not exist:
> bigint + character varying
>   Hint: No operator matches the given name and argument type(s). You might
> need to add explicit type casts.
>   Position: 65  Call getNextException to see other errors in the batch.
> at org.postgresql.jdbc.BatchResultHandler.handleCompletion(
> BatchResultHandler.java:166)
> at org.postgresql.core.v3.QueryExecutorImpl.execute(
> QueryExecutorImpl.java:490)
> at org.postgresql.jdbc.PgStatement.executeBatch(PgStatement.java:835)
> at org.postgresql.jdbc.PgPreparedStatement.executeBatch(
> PgPreparedStatement.java:1556)
> at com.zaxxer.hikari.proxy.StatementProxy.executeBatch(
> StatementProxy.java:127)
> at com.zaxxer.hikari.proxy.HikariPreparedStatementProxy.executeBatch(
> HikariPreparedStatementProxy.java)
> at org.wso2.extension.siddhi.store.rdbms.RDBMSEventTable.
> batchProcessSQLUpdates(RDBMSEventTable.java:558)
>
>
>
> Siddhi file to re-produce the exception;
>
> ———
>
> @App:name("Test App")
> @App:description("App_Description")
> -- Please refer to http://wso2.github.io/siddhi/documentation/siddhi-4.0/
>
> -- insert into my_test_table (id, occurrence, some_key) values(1, 1, 'o');
> -- insert into my_test_table (id, occurrence, some_key) values(2, 1000,
> 'nok');
>
> define trigger TriggerStream at every 10 sec;
>
> @Store(type = "rdbms", jdbc.url = "jdbc:postgresql://localhost:5432/my_db",
> username = "my_user", password = "my_password" , jdbc.driver.name =
> "org.postgresql.Driver", table.name = 'my_test_table')
> @primaryKey('id')
> define table TestTable(id long, occurrence long, some_key string);
>
> @sink(type='log')
> define stream UpdateNumeric(triggered_time long, id long, occurrence long,
> some_key string);
>
> @sink(type='log')
> define stream UpdateString(triggered_time long, id long, occurrence long,
> some_key string);
>
> from TriggerStream left outer join TestTable
>     on TestTable.id==1
> select *
> insert into UpdateNumeric;
>
> from TriggerStream left outer join TestTable
>     on TestTable.id==2
> select *
> insert into UpdateString;
>
> from UpdateNumeric
> select *
> update TestTable
>  set TestTable.occurrence = TestTable.occurrence + 1
>   on TestTable.id == id;
>
> from UpdateString
> select *
> update TestTable
>  set TestTable.some_key = TestTable.some_key + "_str"
>   on TestTable.id == id;
>
> ———
>
>
>
> Best regards,
>
> Bahtiyar.
>
>
> On 4 Apr 2018, at 17:27, Bahtiyar KARANLIK <bahti...@karanlik.org> wrote:
>
> Hi All,
>
> I'm working on an "alarm management" test app. The idea is as follows:
>
> - I've an EventStream with following important fields:
> -- event_type = type of the event. i.e. threshold_exceeded_event
> -- event_source = the entity that is going to be monitored. i.e.
> region1.router1.cpu_utilization
> -- level = severity of the Event (i.e. 0 -> Info, 1-> warning, 2 ->
> critical)
> -- level_changed = boolean flag (whether the level has changed from the
> previous event item. i.e. if level is info at t0, and warning at t1, then
> t1 event has this level_changed flag set to true)
>
> Then my algorithm comes in:
>
> 1) if the level_changed flag is false and if it is not Info;
> --> Then I should have an "Active" (status == 1) alarm in the DB, and
> should update its occurence count.
>
> 2) if the level_changed flag is true;
> 2.a) I'm going to "clear" the alarm in the DB table (keywords: RDBMS Store
> Extension + PostgreSQL)
> 2.b) check 'level' field, and if it is not "Info" (==0), then Create a NEW
> alarm in the table with the current level (level = severity).
>
> Here starts the problem;
>
> for events with "level_changed" set to true, updating the active Alarms's
> status field is not executed at all!! I can create Alarms (AlarmsToCreate),
> Increment their Occurence count (AlarmsToIncrement), but can not set their
> status fields (AlarmsToClear).
>
> I guess, there is a problem with multiple DB hits (i.e. clear the
> Alarm-set status to 0 for alarm with id 3 and create a new Alarm) for the
> same event (within a stream). I can see that the logic can decide that it
> needs to Clear an alarm, and create a new one (logs with AlarmsToClear and
> AlarmsToCreate are there !. Please check below logs for further info).
>
> *Any comments, guidance, is highly appreciated, since I'm stuck with this
> issue for two weekends up to now :-( Either, there is something that I'm
> completely missing related to Stream processing and DB hits or there is a
> bug somewhere over there (probably with store-rdbms extension's
> update-sql-compile logics..) *
>
> Best regards,
>
> *Please find below related part from my siddhi file:*
>
> ---------------------------------------------------------------
> -- SELECT LEVEL CHANGED EVENTS
>
> @info(name = 'select_level_changed_events')
> -- from EventStream[cast(map:get(EventStream.context, 'level_changed'), 
> 'string') == 'true' and cast(map:get(EventStream.context, 'old_level'), 
> 'string') != '0']
> from EventStream[cast(map:get(context, 'level_changed'), 'string') == 'true']
> --select EventStream.tenant, EventStream.event_type, EventStream.event_uei, 
> EventStream.event_source, EventStream.proposed_action, EventStream.context as 
> event_context
> select *
> insert into TempLevelChangedEvents;
>
> ---------------------------------------------------------------
> -- CLEAR ALARMS
> @info(name='select_alarms_to_clear')
> from TempLevelChangedEvents left outer join AlarmTable[alarm_status==1]
>    on (AlarmTable.alarm_status == 1 and AlarmTable.alarm_key == event_type 
> and AlarmTable.alarmed_resource == event_source)
> select AlarmTable.id, AlarmTable.alarm_status, AlarmTable.severity, 
> AlarmTable.alarm_key, AlarmTable.alarmed_resource, AlarmTable.occurrence,  
> AlarmTable.source_time
> insert into AlarmsToClear;
>
> @info(name = 'clear_alarms')
> from AlarmsToClear
> select *
> update AlarmTable
>  set AlarmTable.occurrence = AlarmTable.occurrence, AlarmTable.alarm_status = > 0
>    on AlarmTable.id == id;
>
> ---------------------------------------------------------------
> ---------------------------------------------------------------
> -- CREATE ALARMS
>
> -- If level has changed to a NON-INFO severity/level, then we should create a 
> new Alarm !
> @info(name = 'select_events_to_create_alarms')
> from TempLevelChangedEvents[convert(cast(map:get(EventStream.context, 
> 'level'), 'string'), 'int') != 0]
> select cast(-1, 'long') as id, cast(1, 'long') as alarm_status, 
> convert(cast(map:get(context, 'level'), 'string'), 'int') as severity, 
> event_type as alarm_key, event_source as alarmed_resource, cast(1, 'long') as 
> occurrence, eventTimestamp() as source_time
> insert into AlarmsToCreate;
>
> @info(name='create_alarms')
> from AlarmsToCreate left outer join NextAlarmId
> select next_id as id, alarm_status, severity, alarm_key, alarmed_resource, 
> occurrence, source_time
> insert into AlarmTable;
>
> ---------------------------------------------------------------
> ---------------------------------------------------------------
> -- Filter NON-INFO Events & load associated Alarms from the DB if exists !!
>
> -- Severity/Level has not changed AND this is not an INFO event !
> -- IF SO, we should have an active Alarm in the DB, and should increment its 
> "occurence"
> @info(name = 'select_non_info_events')
> from EventStream[ cast(map:get(context, 'level_changed'), 'string') != 'true' 
> and convert(cast(map:get(EventStream.context, 'level'), 'string'), 'int') != 
> 0]
> select *
> insert into TmpNonInfoEventStream;
>
> @info(name = 'load_active_alarms_for_non_info_events')
> from TmpNonInfoEventStream left outer join AlarmTable[alarm_status==1]
>    on (AlarmTable.alarm_status == 1 and AlarmTable.alarm_key == event_type 
> and AlarmTable.alarmed_resource == event_source)
> select *
> insert into TmpNonInfoEventAlarmStream;
>
> @info(name = 'select_alarms_to_increment')
> from TmpNonInfoEventAlarmStream[(not (id is null)) and severity == 
> convert(cast(map:get(EventStream.context, 'level'), 'string'), 'int')]
> select AlarmTable.id, AlarmTable.alarm_status, AlarmTable.severity, 
> AlarmTable.alarm_key, AlarmTable.alarmed_resource, AlarmTable.occurrence,  
> AlarmTable.source_time
> insert into AlarmsToIncrement;
>
>
> @info(name = 'increment_alarms')
> from AlarmsToIncrement
> select *
> update AlarmTable
>  set AlarmTable.occurrence = AlarmTable.occurrence+1
>    on AlarmTable.id == id;
>
> *Also please see the logs below as well:*
> Log related to *successfull* increment of the alarm count. I can see that
> this updates the occurrence count of the specified Alarm in the DB.
>
> 2018-04-04 17:22:40,599 INFO  
> [org.wso2.siddhi.core.stream.output.sink.LogSink]
> - mond : *AlarmsToIncrement* : Event{timestamp=1522851760576, data=[139,
> 1, 1, thresholdEvent, region1.router1.cpu_utilization, 3, 1522850726953],
> isExpired=false}
> 2018-04-04 17:22:40,600 INFO  
> [org.wso2.siddhi.core.stream.output.sink.LogSink]
> - mond : TmpNonInfoEventAlarmStream : Event{timestamp=1522851760576,
> data=[null, thresholdEvent, threshold_rearmed, 
> region1.router1.cpu_utilization,
> TODO, {path=region1.router1.cpu_utilization, aggr_context={average=12584.705,
> current=12669.98, percentile_50=12584.705, percentile_95=12669.98,
> percentile_100=12669.98, percentile_99=12669.98}, level_changed=false,
> metric_context={node_profile=null, node=null, metric_name=null,
> city=null, description=description, location=null, region=null},
> threshold_context={selectivity=1, metric_name=cpu_utilization,
> critical=15000.0, warning_action=MAIL_DEVOPS, warning=8000.0,
> aggr_field=rate, id=1, aggr_function=average, critical_action=MAIL_MANAGER,
> path_regex=.*cpu_utilization}, old_level=1, rate=12669.98, level=1,
> delta=3800994.0, value=4.22006964884E11, tenant=null,
> timestamp=1521697800}, 139, 1, 1, thresholdEvent, 
> region1.router1.cpu_utilization,
> 3, 1522850726953], isExpired=false}
>
> *HERE IS THE PROBLEM. I can see the AlarmsToClear, that exact Alarm's
> status must be set to 0. BUT, it is not changed in the DB! Also below, I
> can see the AlarmsToCreate log, and that item is successfully created in
> the DB..*
>
> [org.wso2.extension.siddhi.store.rdbms.util.RDBMSTableUtils] : Closed
> PreparedStatement Ignored FQCN: org.apache.commons.logging.
> impl.SLF4JLocationAwareLog
> [com.zaxxer.hikari.pool.PoolElf] : HikariPool-2 - Reset (nothing) on
> connection org.postgresql.jdbc.PgConnection@202d9236
> [org.wso2.extension.siddhi.store.rdbms.util.RDBMSTableUtils] : Closed
> Connection Ignored FQCN: org.apache.commons.logging.
> impl.SLF4JLocationAwareLog
> 2018-04-04 17:23:43,031 INFO  
> [org.wso2.siddhi.core.stream.output.sink.LogSink]
> - mond : *AlarmsToClear* : Event{timestamp=1522851823026, data=[139, 1,
> 1, thresholdEvent, region1.router1.cpu_utilization, 4, 1522850726953],
> isExpired=false}
> [org.wso2.extension.siddhi.store.rdbms.util.RDBMSTableUtils] : Closed
> ResultSet Ignored FQCN: org.apache.commons.logging.
> impl.SLF4JLocationAwareLog
> [org.wso2.extension.siddhi.store.rdbms.util.RDBMSTableUtils] : Closed
> PreparedStatement Ignored FQCN: org.apache.commons.logging.
> impl.SLF4JLocationAwareLog
> [com.zaxxer.hikari.pool.PoolElf] : HikariPool-2 - Reset (nothing) on
> connection org.postgresql.jdbc.PgConnection@202d9236
> [org.wso2.extension.siddhi.store.rdbms.util.RDBMSTableUtils] : Closed
> Connection Ignored FQCN: org.apache.commons.logging.
> impl.SLF4JLocationAwareLog
> [org.wso2.extension.siddhi.store.rdbms.RDBMSEventTable] : Ignore the
> condition resolver in 'find()' method for compile condition: '?' Ignored
> FQCN: org.apache.commons.logging.impl.SLF4JLocationAwareLog
> [org.wso2.extension.siddhi.store.rdbms.util.RDBMSTableUtils] : Closed
> ResultSet Ignored FQCN: org.apache.commons.logging.
> impl.SLF4JLocationAwareLog
> [org.wso2.extension.siddhi.store.rdbms.util.RDBMSTableUtils] : Closed
> PreparedStatement Ignored FQCN: org.apache.commons.logging.
> impl.SLF4JLocationAwareLog
> [com.zaxxer.hikari.pool.PoolElf] : HikariPool-3 - Reset (nothing) on
> connection org.postgresql.jdbc.PgConnection@432af457
> [org.wso2.extension.siddhi.store.rdbms.util.RDBMSTableUtils] : Closed
> Connection Ignored FQCN: org.apache.commons.logging.
> impl.SLF4JLocationAwareLog
> [org.wso2.extension.siddhi.store.rdbms.util.RDBMSTableUtils] : Closed
> PreparedStatement Ignored FQCN: org.apache.commons.logging.
> impl.SLF4JLocationAwareLog
> [com.zaxxer.hikari.pool.PoolElf] : HikariPool-2 - Reset (autoCommit) on
> connection org.postgresql.jdbc.PgConnection@202d9236
> [org.wso2.extension.siddhi.store.rdbms.util.RDBMSTableUtils] : Closed
> Connection Ignored FQCN: org.apache.commons.logging.
> impl.SLF4JLocationAwareLog
> [org.wso2.extension.siddhi.store.rdbms.RDBMSEventTable] : Ignore the
> condition resolver in 'find()' method for compile condition: '?' Ignored
> FQCN: org.apache.commons.logging.impl.SLF4JLocationAwareLog
> [org.wso2.extension.siddhi.store.rdbms.util.RDBMSTableUtils] : Closed
> ResultSet Ignored FQCN: org.apache.commons.logging.
> impl.SLF4JLocationAwareLog
> [org.wso2.extension.siddhi.store.rdbms.util.RDBMSTableUtils] : Closed
> PreparedStatement Ignored FQCN: org.apache.commons.logging.
> impl.SLF4JLocationAwareLog
> [com.zaxxer.hikari.pool.PoolElf] : HikariPool-3 - Reset (nothing) on
> connection org.postgresql.jdbc.PgConnection@432af457
> [org.wso2.extension.siddhi.store.rdbms.util.RDBMSTableUtils] : Closed
> Connection Ignored FQCN: org.apache.commons.logging.
> impl.SLF4JLocationAwareLog
> 2018-04-04 17:23:43,039 INFO  
> [org.wso2.siddhi.core.stream.output.sink.LogSink]
> - mond : *AlarmsToCreate* : Event{timestamp=1522851823026, data=[-1, 1,
> 2, thresholdEvent, region1.router1.cpu_utilization, 1, 1522851823026],
> isExpired=false}
> 2018-04-04 17:23:43,040 INFO  
> [org.wso2.siddhi.core.stream.output.sink.LogSink]
> - mond : EventStream : Event{timestamp=1522851823026, data=[null,
> thresholdEvent, threshold_rearmed, region1.router1.cpu_utilization, TODO,
> {path=region1.router1.cpu_utilization, aggr_context={average=12584.705,
> current=12584.705, percentile_50=12584.705, percentile_95=12669.98,
> percentile_100=12669.98, percentile_99=12669.98}, level_changed=true,
> metric_context={node_profile=null, node=null, metric_name=null,
> city=null, description=description, location=null, region=null},
> threshold_context={selectivity=1, metric_name=cpu_utilization,
> critical=11000.0, warning_action=MAIL_DEVOPS, warning=8000.0,
> aggr_field=rate, id=1, aggr_function=average, critical_action=MAIL_MANAGER,
> path_regex=.*cpu_utilization}, old_level=1, rate=12584.705, level=2,
> delta=-7550823.0, value=4.21999414061E11, tenant=null,
> timestamp=1521697200}], isExpired=false}
> [org.wso2.extension.siddhi.io.http.source.HttpWorkerThread] : Submitted
> Event {"event": {"path": "region1.router1.cpu_utilization",
> "description": "description", "value": 421999414061.0, "timestamp":
> 1521697200}} Stream
> [org.wso2.transport.http.netty.listener.HTTPServerChannelInitializer] :
> Initializing source channel pipeline
>
>
>
>
> _______________________________________________
> Dev mailing list
> Dev@wso2.org
> http://wso2.org/cgi-bin/mailman/listinfo/dev
>
>
>
>
> _______________________________________________
> Dev mailing list
> Dev@wso2.org
> http://wso2.org/cgi-bin/mailman/listinfo/dev
>
>


-- 
W.G. Gihan Anuruddha
Associate Technical Lead | WSO2, Inc.
_______________________________________________
Dev mailing list
Dev@wso2.org
http://wso2.org/cgi-bin/mailman/listinfo/dev

Reply via email to