Hi,

I have seen that same sort of exception occurs, when a HashMap is used by
multiple threads concurrently. It was necessary to use ConcurrentHashMap or
do the proper synchronization in our logic. This was explained as a state
corruption [3 - *(interesting read)*] and it is no wonder looking at source
of relevant methods [1].

As there is no such thing called ConcurrentHashSet (and as it is not the
best option), we should synchronizing removeIndexedTable,
refreshIndexedTableArray, as well as getAllIndexedTables to correct this
behaviour.

"getAllIndexedTables" should be synchronized because we might return a
"Set" which is in an inconsistent state, if "getAllIndexedTables" is
invoked while some other thread executes removeIndexedTable or
refreshIndexedTableArray. And may be, the method which called
"getAllIndexedTables"
might attempt to execute another state changing method available in "Set"
before refresh or remove completes, the "Set" ending up with state
corruption.

[2] is also interesting although it does not directly relate with this.
With "newSetFromMap" you can create a backed Set based on a
ConcurrentHashMap.


[1]
http://grepcode.com/file/repository.grepcode.com/java/root/jdk/openjdk/6-b14/java/util/AbstractCollection.java#AbstractCollection.toArray%28%29
[2]
http://docs.oracle.com/javase/6/docs/api/java/util/Collections.html#newSetFromMap%28java.util.Map%29
[3] http://mailinator.blogspot.gr/2009/06/beautiful-race-condition.html

Best Regards,
Ayoma.

On Wed, Dec 16, 2015 at 3:32 PM, Sumedha Rubasinghe <sume...@wso2.com>
wrote:

> We have DAS Lite included in IoT Server and several summarisation scripts
> deployed. Server is going OOM frequently with following exception.
>
> Shouldn't this[1] method be synchronised?
>
> [1]
> https://github.com/wso2/carbon-analytics/blob/master/components/analytics-core/org.wso2.carbon.analytics.dataservice.core/src/main/java/org/wso2/carbon/analytics/dataservice/core/indexing/AnalyticsIndexedTableStore.java#L45
>
>
> >>>>>>>>>>>
> [2015-12-16 15:11:00,004]  INFO
> {org.wso2.carbon.analytics.spark.core.AnalyticsTask} -  Executing the
> schedule task for: Light_Sensor_Script for tenant id: -1234
> [2015-12-16 15:11:00,005]  INFO
> {org.wso2.carbon.analytics.spark.core.AnalyticsTask} -  Executing the
> schedule task for: Magnetic_Sensor_Script for tenant id: -1234
> [2015-12-16 15:11:00,005]  INFO
> {org.wso2.carbon.analytics.spark.core.AnalyticsTask} -  Executing the
> schedule task for: Pressure_Sensor_Script for tenant id: -1234
> [2015-12-16 15:11:00,006]  INFO
> {org.wso2.carbon.analytics.spark.core.AnalyticsTask} -  Executing the
> schedule task for: Proximity_Sensor_Script for tenant id: -1234
> [2015-12-16 15:11:00,006]  INFO
> {org.wso2.carbon.analytics.spark.core.AnalyticsTask} -  Executing the
> schedule task for: Rotation_Sensor_Script for tenant id: -1234
> [2015-12-16 15:11:00,007]  INFO
> {org.wso2.carbon.analytics.spark.core.AnalyticsTask} -  Executing the
> schedule task for: Temperature_Sensor_Script for tenant id: -1234
> [2015-12-16 15:11:01,132] ERROR
> {org.wso2.carbon.ntask.core.impl.TaskQuartzJobAdapter} -  Error in
> executing task: null
> java.util.ConcurrentModificationException
> at java.util.HashMap$HashIterator.nextEntry(HashMap.java:922)
> at java.util.HashMap$KeyIterator.next(HashMap.java:956)
> at java.util.AbstractCollection.toArray(AbstractCollection.java:195)
> at
> org.wso2.carbon.analytics.dataservice.core.indexing.AnalyticsIndexedTableStore.refreshIndexedTableArray(AnalyticsIndexedTableStore.java:46)
> at
> org.wso2.carbon.analytics.dataservice.core.indexing.AnalyticsIndexedTableStore.addIndexedTable(AnalyticsIndexedTableStore.java:37)
> at
> org.wso2.carbon.analytics.dataservice.core.AnalyticsDataServiceImpl.refreshIndexedTableStoreEntry(AnalyticsDataServiceImpl.java:512)
> at
> org.wso2.carbon.analytics.dataservice.core.AnalyticsDataServiceImpl.invalidateAnalyticsTableInfo(AnalyticsDataServiceImpl.java:525)
> at
> org.wso2.carbon.analytics.dataservice.core.AnalyticsDataServiceImpl.checkAndInvalidateTableInfo(AnalyticsDataServiceImpl.java:504)
> at
> org.wso2.carbon.analytics.dataservice.core.AnalyticsDataServiceImpl.setTableSchema(AnalyticsDataServiceImpl.java:495)
> at
> org.wso2.carbon.analytics.spark.core.sources.AnalyticsRelation.insert(AnalyticsRelation.java:162)
> at org.apache.spark.sql.sources.InsertIntoDataSource.run(commands.scala:53)
> at
> org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult$lzycompute(commands.scala:57)
> at
> org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult(commands.scala:57)
> at
> org.apache.spark.sql.execution.ExecutedCommand.doExecute(commands.scala:68)
> at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:88)
> at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:88)
> at
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
> at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:87)
> at
> org.apache.spark.sql.SQLContext$QueryExecution.toRdd$lzycompute(SQLContext.scala:950)
> at
> org.apache.spark.sql.SQLContext$QueryExecution.toRdd(SQLContext.scala:950)
> at org.apache.spark.sql.DataFrame.<init>(DataFrame.scala:144)
> at org.apache.spark.sql.DataFrame.<init>(DataFrame.scala:128)
> at org.apache.spark.sql.DataFrame$.apply(DataFrame.scala:51)
> at org.apache.spark.sql.SQLContext.sql(SQLContext.scala:755)
> at
> org.wso2.carbon.analytics.spark.core.internal.SparkAnalyticsExecutor.executeQueryLocal(SparkAnalyticsExecutor.java:710)
> at
> org.wso2.carbon.analytics.spark.core.internal.SparkAnalyticsExecutor.executeQuery(SparkAnalyticsExecutor.java:692)
> at
> org.wso2.carbon.analytics.spark.core.CarbonAnalyticsProcessorService.executeQuery(CarbonAnalyticsProcessorService.java:199)
> at
> org.wso2.carbon.analytics.spark.core.CarbonAnalyticsProcessorService.executeScript(CarbonAnalyticsProcessorService.java:149)
> at
> org.wso2.carbon.analytics.spark.core.AnalyticsTask.execute(AnalyticsTask.java:57)
> at
> org.wso2.carbon.ntask.core.impl.TaskQuartzJobAdapter.execute(TaskQuartzJobAdapter.java:67)
> at org.quartz.core.JobRunShell.run(JobRunShell.java:213)
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
> at java.util.concurrent.FutureTask.run(FutureTask.java:262)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> at java.lang.Thread.run(Thread.java:745)
> [2015-12-16 15:12:00,001]  INFO
> {org.wso2.carbon.analytics.spark.core.AnalyticsTask} -  Executing the
> schedule task for: Accelerometer_Sensor_Script for tenant id: -1234
>
> --
> /sumedha
> m: +94 773017743
> b :  bit.ly/sumedha
>
> _______________________________________________
> Dev mailing list
> Dev@wso2.org
> http://wso2.org/cgi-bin/mailman/listinfo/dev
>
>


-- 
Ayoma Wijethunga
Software Engineer
WSO2, Inc.; http://wso2.com
lean.enterprise.middleware

Mobile : +94 (0) 719428123 <+94+(0)+719428123>
Blog : http://www.ayomaonline.com
LinkedIn: https://www.linkedin.com/in/ayoma
_______________________________________________
Dev mailing list
Dev@wso2.org
http://wso2.org/cgi-bin/mailman/listinfo/dev

Reply via email to