Hi all, I was working on providing the incremental processing support for BAM. This feature was implemented in BAM 2.4.0 [1], but it wasn't well tested in fully distributed mode during the BAM 2.4.0 release, hence it was marked as experimental feature. You can find the implementation details of this feature from [2].
During the last week I was involved with testing this feature with 3 node hadoop cluster and Cassandra cluster. And all use cases mentioned in [1] was able to run in the external cluster without any issues. And also I have implemented incremental average operation (incr_avg()) which incrementally calculates the final value based on the last hive query execution and the current execution. The intermediate necessary values for incremental operation, are store in cassandra and used for the current hive query execution. And once the current hive query execution is completed, the intermediate results will be again replaced with the new values. I have tested this incr_avg() operation also with fully distributed setup and it works well. As the first step, I have only implemented the incr_avg() to make sure whether the adopted approach will succeed on fully distributed setup. The below is the hive script sample which uses the incr_avg() function: *CREATE EXTERNAL TABLE IF NOT EXISTS PhoneSalesTable (orderID STRING, brandName STRING, userName STRING, quantity INT, version STRING) STORED BY 'org.apache.hadoop.hive.cassandra.CassandraStorageHandler' WITH SERDEPROPERTIES ( "wso2.carbon.datasource.name <http://wso2.carbon.datasource.name>" = "WSO2BAM_CASSANDRA_DATASOURCE", "cassandra.cf.name <http://cassandra.cf.name>" = "org_wso2_bam_phone_retail_store_kpi" , "cassandra.columns.mapping" = ":key,payload_brand, payload_user, payload_quantity, Version" ); @Incremental(name="avgAnalysis", tables="PhoneSalesTable", bufferTime="20")select brandName, count(DISTINCT orderID), incr_avg(quantity, "average_quantity") from PhoneSalesTable where version= "1.0.0" group by brandName;* The following are the to-do items on this feature: - Do a load test with all cases/scenarios with distributed setup - Implement more commonly used hive functions as incremental functions(incr_count, incr_sum, etc). - Write a sample to explain how to use this feature. [1] http://docs.wso2.org/pages/viewpage.action?pageId=32345660 [2] http://wso2-oxygen-tank.10903.n7.nabble.com/Incremental-Data-Processing-for-BAM-td77582.html Thanks, Sinthuja. -- *Sinthuja Rajendran* Software Engineer <http://wso2.com/> WSO2, Inc.:http://wso2.com Blog: http://sinthu-rajan.blogspot.com/ Mobile: +94774273955
_______________________________________________ Architecture mailing list [email protected] https://mail.wso2.org/cgi-bin/mailman/listinfo/architecture
