Hi, I have seen that same sort of exception occurs, when a HashMap is used by multiple threads concurrently. It was necessary to use ConcurrentHashMap or do the proper synchronization in our logic. This was explained as a state corruption [3 - *(interesting read)*] and it is no wonder looking at source of relevant methods [1].
As there is no such thing called ConcurrentHashSet (and as it is not the best option), we should synchronizing removeIndexedTable, refreshIndexedTableArray, as well as getAllIndexedTables to correct this behaviour. "getAllIndexedTables" should be synchronized because we might return a "Set" which is in an inconsistent state, if "getAllIndexedTables" is invoked while some other thread executes removeIndexedTable or refreshIndexedTableArray. And may be, the method which called "getAllIndexedTables" might attempt to execute another state changing method available in "Set" before refresh or remove completes, the "Set" ending up with state corruption. [2] is also interesting although it does not directly relate with this. With "newSetFromMap" you can create a backed Set based on a ConcurrentHashMap. [1] http://grepcode.com/file/repository.grepcode.com/java/root/jdk/openjdk/6-b14/java/util/AbstractCollection.java#AbstractCollection.toArray%28%29 [2] http://docs.oracle.com/javase/6/docs/api/java/util/Collections.html#newSetFromMap%28java.util.Map%29 [3] http://mailinator.blogspot.gr/2009/06/beautiful-race-condition.html Best Regards, Ayoma. On Wed, Dec 16, 2015 at 3:32 PM, Sumedha Rubasinghe <sume...@wso2.com> wrote: > We have DAS Lite included in IoT Server and several summarisation scripts > deployed. Server is going OOM frequently with following exception. > > Shouldn't this[1] method be synchronised? > > [1] > https://github.com/wso2/carbon-analytics/blob/master/components/analytics-core/org.wso2.carbon.analytics.dataservice.core/src/main/java/org/wso2/carbon/analytics/dataservice/core/indexing/AnalyticsIndexedTableStore.java#L45 > > > >>>>>>>>>>> > [2015-12-16 15:11:00,004] INFO > {org.wso2.carbon.analytics.spark.core.AnalyticsTask} - Executing the > schedule task for: Light_Sensor_Script for tenant id: -1234 > [2015-12-16 15:11:00,005] INFO > {org.wso2.carbon.analytics.spark.core.AnalyticsTask} - Executing the > schedule task for: Magnetic_Sensor_Script for tenant id: -1234 > [2015-12-16 15:11:00,005] INFO > {org.wso2.carbon.analytics.spark.core.AnalyticsTask} - Executing the > schedule task for: Pressure_Sensor_Script for tenant id: -1234 > [2015-12-16 15:11:00,006] INFO > {org.wso2.carbon.analytics.spark.core.AnalyticsTask} - Executing the > schedule task for: Proximity_Sensor_Script for tenant id: -1234 > [2015-12-16 15:11:00,006] INFO > {org.wso2.carbon.analytics.spark.core.AnalyticsTask} - Executing the > schedule task for: Rotation_Sensor_Script for tenant id: -1234 > [2015-12-16 15:11:00,007] INFO > {org.wso2.carbon.analytics.spark.core.AnalyticsTask} - Executing the > schedule task for: Temperature_Sensor_Script for tenant id: -1234 > [2015-12-16 15:11:01,132] ERROR > {org.wso2.carbon.ntask.core.impl.TaskQuartzJobAdapter} - Error in > executing task: null > java.util.ConcurrentModificationException > at java.util.HashMap$HashIterator.nextEntry(HashMap.java:922) > at java.util.HashMap$KeyIterator.next(HashMap.java:956) > at java.util.AbstractCollection.toArray(AbstractCollection.java:195) > at > org.wso2.carbon.analytics.dataservice.core.indexing.AnalyticsIndexedTableStore.refreshIndexedTableArray(AnalyticsIndexedTableStore.java:46) > at > org.wso2.carbon.analytics.dataservice.core.indexing.AnalyticsIndexedTableStore.addIndexedTable(AnalyticsIndexedTableStore.java:37) > at > org.wso2.carbon.analytics.dataservice.core.AnalyticsDataServiceImpl.refreshIndexedTableStoreEntry(AnalyticsDataServiceImpl.java:512) > at > org.wso2.carbon.analytics.dataservice.core.AnalyticsDataServiceImpl.invalidateAnalyticsTableInfo(AnalyticsDataServiceImpl.java:525) > at > org.wso2.carbon.analytics.dataservice.core.AnalyticsDataServiceImpl.checkAndInvalidateTableInfo(AnalyticsDataServiceImpl.java:504) > at > org.wso2.carbon.analytics.dataservice.core.AnalyticsDataServiceImpl.setTableSchema(AnalyticsDataServiceImpl.java:495) > at > org.wso2.carbon.analytics.spark.core.sources.AnalyticsRelation.insert(AnalyticsRelation.java:162) > at org.apache.spark.sql.sources.InsertIntoDataSource.run(commands.scala:53) > at > org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult$lzycompute(commands.scala:57) > at > org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult(commands.scala:57) > at > org.apache.spark.sql.execution.ExecutedCommand.doExecute(commands.scala:68) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:88) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:88) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147) > at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:87) > at > org.apache.spark.sql.SQLContext$QueryExecution.toRdd$lzycompute(SQLContext.scala:950) > at > org.apache.spark.sql.SQLContext$QueryExecution.toRdd(SQLContext.scala:950) > at org.apache.spark.sql.DataFrame.<init>(DataFrame.scala:144) > at org.apache.spark.sql.DataFrame.<init>(DataFrame.scala:128) > at org.apache.spark.sql.DataFrame$.apply(DataFrame.scala:51) > at org.apache.spark.sql.SQLContext.sql(SQLContext.scala:755) > at > org.wso2.carbon.analytics.spark.core.internal.SparkAnalyticsExecutor.executeQueryLocal(SparkAnalyticsExecutor.java:710) > at > org.wso2.carbon.analytics.spark.core.internal.SparkAnalyticsExecutor.executeQuery(SparkAnalyticsExecutor.java:692) > at > org.wso2.carbon.analytics.spark.core.CarbonAnalyticsProcessorService.executeQuery(CarbonAnalyticsProcessorService.java:199) > at > org.wso2.carbon.analytics.spark.core.CarbonAnalyticsProcessorService.executeScript(CarbonAnalyticsProcessorService.java:149) > at > org.wso2.carbon.analytics.spark.core.AnalyticsTask.execute(AnalyticsTask.java:57) > at > org.wso2.carbon.ntask.core.impl.TaskQuartzJobAdapter.execute(TaskQuartzJobAdapter.java:67) > at org.quartz.core.JobRunShell.run(JobRunShell.java:213) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471) > at java.util.concurrent.FutureTask.run(FutureTask.java:262) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > at java.lang.Thread.run(Thread.java:745) > [2015-12-16 15:12:00,001] INFO > {org.wso2.carbon.analytics.spark.core.AnalyticsTask} - Executing the > schedule task for: Accelerometer_Sensor_Script for tenant id: -1234 > > -- > /sumedha > m: +94 773017743 > b : bit.ly/sumedha > > _______________________________________________ > Dev mailing list > Dev@wso2.org > http://wso2.org/cgi-bin/mailman/listinfo/dev > > -- Ayoma Wijethunga Software Engineer WSO2, Inc.; http://wso2.com lean.enterprise.middleware Mobile : +94 (0) 719428123 <+94+(0)+719428123> Blog : http://www.ayomaonline.com LinkedIn: https://www.linkedin.com/in/ayoma
_______________________________________________ Dev mailing list Dev@wso2.org http://wso2.org/cgi-bin/mailman/listinfo/dev