Author: cdouglas
Date: Fri Dec 19 17:11:42 2008
New Revision: 728206

URL: http://svn.apache.org/viewvc?rev=728206&view=rev
Log:
HADOOP-4827. Replace Consolidator with Aggregator macros in Chukwa. Contributed 
by Eric Yang

Removed:
    hadoop/core/trunk/src/contrib/chukwa/conf/mysql_create_tables
    hadoop/core/trunk/src/contrib/chukwa/conf/mysql_upgrade_tables
Modified:
    hadoop/core/trunk/CHANGES.txt
    hadoop/core/trunk/src/contrib/chukwa/bin/dbAdmin.sh
    hadoop/core/trunk/src/contrib/chukwa/conf/aggregator.sql
    hadoop/core/trunk/src/contrib/chukwa/conf/database_create_tables
    
hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/database/Aggregator.java
    
hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/util/DatabaseWriter.java

Modified: hadoop/core/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=728206&r1=728205&r2=728206&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Fri Dec 19 17:11:42 2008
@@ -481,6 +481,9 @@
     HADOOP-4849. Documentation for Service Level Authorization implemented in
     HADOOP-4348. (acmurthy)
 
+    HADOOP-4827. Replace Consolidator with Aggregator macros in Chukwa (Eric
+    Yang via cdouglas)
+
 Release 0.19.1 - Unreleased
 
   IMPROVEMENTS

Modified: hadoop/core/trunk/src/contrib/chukwa/bin/dbAdmin.sh
URL: 
http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/bin/dbAdmin.sh?rev=728206&r1=728205&r2=728206&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/bin/dbAdmin.sh (original)
+++ hadoop/core/trunk/src/contrib/chukwa/bin/dbAdmin.sh Fri Dec 19 17:11:42 2008
@@ -37,18 +37,17 @@
     cat ${CHUKWA_CONF_DIR}/jdbc.conf | \
     while read LINE; do
         CLUSTER=`echo ${LINE} | cut -f 1 -d'='`
-        ${JAVA_HOME}/bin/java -DCLUSTER=${CLUSTER} ${JVM_OPTS} 
org.apache.hadoop.chukwa.database.TableCreator ${EXP_DATE} 7 
-        ${JAVA_HOME}/bin/java -DCLUSTER=${CLUSTER} ${JVM_OPTS} 
org.apache.hadoop.chukwa.database.TableCreator ${EXP_DATE} 30
-        ${JAVA_HOME}/bin/java -DCLUSTER=${CLUSTER} ${JVM_OPTS} 
org.apache.hadoop.chukwa.database.TableCreator ${EXP_DATE} 91
-        ${JAVA_HOME}/bin/java -DCLUSTER=${CLUSTER} ${JVM_OPTS} 
org.apache.hadoop.chukwa.database.TableCreator ${EXP_DATE} 365
-        ${JAVA_HOME}/bin/java -DCLUSTER=${CLUSTER} ${JVM_OPTS} 
org.apache.hadoop.chukwa.database.TableCreator ${EXP_DATE} 3650
-        ${JAVA_HOME}/bin/java -DCLUSTER=${CLUSTER} ${JVM_OPTS} 
org.apache.hadoop.chukwa.database.Aggregator 
-        ${JAVA_HOME}/bin/java -DCLUSTER=${CLUSTER} ${JVM_OPTS} 
org.apache.hadoop.chukwa.database.Consolidator 
-        ${JAVA_HOME}/bin/java -DCLUSTER=${CLUSTER} ${JVM_OPTS} 
org.apache.hadoop.chukwa.database.DataExpiration ${EXP_DATE} 7
-        ${JAVA_HOME}/bin/java -DCLUSTER=${CLUSTER} ${JVM_OPTS} 
org.apache.hadoop.chukwa.database.DataExpiration ${EXP_DATE} 30
-        ${JAVA_HOME}/bin/java -DCLUSTER=${CLUSTER} ${JVM_OPTS} 
org.apache.hadoop.chukwa.database.DataExpiration ${EXP_DATE} 91
-        ${JAVA_HOME}/bin/java -DCLUSTER=${CLUSTER} ${JVM_OPTS} 
org.apache.hadoop.chukwa.database.DataExpiration ${EXP_DATE} 365
-        ${JAVA_HOME}/bin/java -DCLUSTER=${CLUSTER} ${JVM_OPTS} 
org.apache.hadoop.chukwa.database.DataExpiration ${EXP_DATE} 3650
+        ${JAVA_HOME}/bin/java -DCLUSTER=${CLUSTER} ${JVM_OPTS} 
org.apache.hadoop.chukwa.database.TableCreator ${EXP_DATE} 7 &
+        ${JAVA_HOME}/bin/java -DCLUSTER=${CLUSTER} ${JVM_OPTS} 
org.apache.hadoop.chukwa.database.TableCreator ${EXP_DATE} 30 &
+        ${JAVA_HOME}/bin/java -DCLUSTER=${CLUSTER} ${JVM_OPTS} 
org.apache.hadoop.chukwa.database.TableCreator ${EXP_DATE} 91 &
+        ${JAVA_HOME}/bin/java -DCLUSTER=${CLUSTER} ${JVM_OPTS} 
org.apache.hadoop.chukwa.database.TableCreator ${EXP_DATE} 365 &
+        ${JAVA_HOME}/bin/java -DCLUSTER=${CLUSTER} ${JVM_OPTS} 
org.apache.hadoop.chukwa.database.TableCreator ${EXP_DATE} 3650 &
+        ${JAVA_HOME}/bin/java -DCLUSTER=${CLUSTER} ${JVM_OPTS} 
org.apache.hadoop.chukwa.database.Aggregator &
+        ${JAVA_HOME}/bin/java -DCLUSTER=${CLUSTER} ${JVM_OPTS} 
org.apache.hadoop.chukwa.database.DataExpiration ${EXP_DATE} 7 &
+        ${JAVA_HOME}/bin/java -DCLUSTER=${CLUSTER} ${JVM_OPTS} 
org.apache.hadoop.chukwa.database.DataExpiration ${EXP_DATE} 30 &
+        ${JAVA_HOME}/bin/java -DCLUSTER=${CLUSTER} ${JVM_OPTS} 
org.apache.hadoop.chukwa.database.DataExpiration ${EXP_DATE} 91 &
+        ${JAVA_HOME}/bin/java -DCLUSTER=${CLUSTER} ${JVM_OPTS} 
org.apache.hadoop.chukwa.database.DataExpiration ${EXP_DATE} 365 &
+        ${JAVA_HOME}/bin/java -DCLUSTER=${CLUSTER} ${JVM_OPTS} 
org.apache.hadoop.chukwa.database.DataExpiration ${EXP_DATE} 3650 &
     done
     end=`date +%s`
     duration=$(( $end - $start ))

Modified: hadoop/core/trunk/src/contrib/chukwa/conf/aggregator.sql
URL: 
http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/conf/aggregator.sql?rev=728206&r1=728205&r2=728206&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/conf/aggregator.sql (original)
+++ hadoop/core/trunk/src/contrib/chukwa/conf/aggregator.sql Fri Dec 19 
17:11:42 2008
@@ -1,12 +1,88 @@
-insert into [cluster_system_metrics] (select timestamp,[avg(system_metrics)] 
from [system_metrics] where timestamp between '[past_hour]' and '[now]' group 
by timestamp);
-insert into [dfs_throughput] (select timestamp,[avg(dfs_datanode)] from 
[dfs_datanode] where timestamp between '[past_hour]' and '[now]' group by 
timestamp);
-insert into [cluster_disk] (select 
a.timestamp,a.mount,a.used,a.available,a.used_percent from (select 
from_unixtime(unix_timestamp(timestamp)-unix_timestamp(timestamp)%60)as 
timestamp,mount,avg(used) as used,avg(available) as available,avg(used_percent) 
as used_percent from [disk] where timestamp between '[past_hour]' and '[now]' 
group by timestamp,mount) as a group by a.timestamp, a.mount);
-insert into [hod_job_digest] (select 
timestamp,d.hodid,d.userid,[avg(system_metrics)] from (select a.HodID,b.host as 
machine,a.userid,a.starttime,a.endtime from [HodJob] a join [hod_machine] b on 
(a.HodID = b.HodID) where endtime between '[past_hour]' and '[now]') as 
d,[system_metrics] where timestamp between d.starttime and d.endtime and 
host=d.machine group by hodid,timestamp);
-insert into [cluster_hadoop_rpc] (select timestamp,[avg(hadoop_rpc)] from 
[hadoop_rpc] where timestamp between '[past_hour]' and '[now]' group by 
timestamp);
-#insert into [cluster_hadoop_mapred] (select 
timestamp,[avg(hadoop_mapred_job)] from [hadoop_mapred_job] where timestamp 
between '[past_hour]' and '[now]' group by timestamp);
-insert into [user_util] (select timestamp, j.UserID as user, 
sum(j.NumOfMachines) as node_total, sum(cpu_idle_pcnt*j.NumOfMachines) as 
cpu_unused, sum((cpu_user_pcnt+cpu_system_pcnt)*j.NumOfMachines) as cpu_used, 
avg(cpu_user_pcnt+cpu_system_pcnt) as cpu_used_pcnt, 
sum((100-(sda_busy_pcnt+sdb_busy_pcnt+sdc_busy_pcnt+sdd_busy_pcnt)/4)*j.NumOfMachines)
 as disk_unused, 
sum(((sda_busy_pcnt+sdb_busy_pcnt+sdc_busy_pcnt+sdd_busy_pcnt)/4)*j.NumOfMachines)
 as disk_used, 
avg(((sda_busy_pcnt+sdb_busy_pcnt+sdc_busy_pcnt+sdd_busy_pcnt)/4)) as 
disk_used_pcnt, sum((100-eth0_busy_pcnt)*j.NumOfMachines) as network_unused, 
sum(eth0_busy_pcnt*j.NumOfMachines) as network_used, avg(eth0_busy_pcnt) as 
network_used_pcnt, sum((100-mem_used_pcnt)*j.NumOfMachines) as memory_unused, 
sum(mem_used_pcnt*j.NumOfMachines) as memory_used, avg(mem_used_pcnt) as 
memory_used_pcnt from [hod_job_digest] d,[HodJob] j where (d.HodID = j.HodID) 
and Timestamp between '[past_hour]' and '[now]' group by j.UserID);
 #insert into [node_util] select starttime, avg(unused) as unused, avg(used) as 
used from (select DATE_FORMAT(m.LAUNCH_TIME,'%Y-%m-%d %H:%i:%s') as 
starttime,sum(AvgCPUBusy*j.NumOfMachines/(60*100)) as 
unused,sum((100-AvgCPUBusy)*j.NumOfMachines/(60*100)) as used from HodJobDigest 
d join HodJob j on (d.HodID = j.HodID) join MRJob m on (m.HodID = j.HodID) 
where m.LAUNCH_TIME >= '2008-09-12 21:11' and m.LAUNCH_TIME <= '2008-09-12 
22:11' and d.Timestamp >= m.LAUNCH_TIME and d.Timestamp <= m.FINISH_TIME group 
by m.MRJobID order by m.LAUNCH_TIME) as t group by t.starttime 
 #insert into [jobtype_util] select CASE WHEN MRJobName like 'PigLatin%' THEN 
'Pig' WHEN MRJobName like 'streamjob%' THEN 'Streaming' WHEN MRJobName like 
'%abacus%' THEN 'Abacus' ELSE 'Other' END as m, count(*)*j.NumOfMachines/60 as 
nodehours,count(distinct(MRJobID)) as jobs from HodJobDigest d join HodJob j on 
(d.HodID = j.HodID) join MRJob m on (m.HodID = j.HodID) where d.Timestamp >= 
'2008-09-12 21:11' and d.Timestamp <= '2008-09-12 22:11' and d.Timestamp >= 
m.LAUNCH_TIME and d.Timestamp <= m.FINISH_TIME group by CASE WHEN MRJobName 
like 'PigLatin%' THEN 'Pig' WHEN MRJobName like 'streamjob%' THEN 'Streaming' 
WHEN MRJobName like '%abacus%' THEN 'Abacus' ELSE 'Other' END order by CASE 
WHEN MRJobName like 'PigLatin%' THEN 'Pig' WHEN MRJobName like 'streamjob%' 
THEN 'Streaming' WHEN MRJobName like '%abacus%' THEN 'Abacus' ELSE 'Other' END
-#insert into [a] select d.Timestamp as starttime,((AvgCPUBusy * 
j.NumOfMachines) / (sum(j.NumOfMachines) * 1)) as used from Digest d join 
HodJob j on (d.HodID = j.HodID) where d.Timestamp >= '[past_hour]' and 
d.Timestamp <= '[now]' group by d.Timestamp order by d.Timestamp 
-#insert into [b] select m, sum(foo.nodehours) as nodehours from (select 
m.MRJobID, round(avg(if(AvgCPUBusy is null,0,AvgCPUBusy)),0) as m, 
count(*)*j.NumOfMachines/60 as nodehours from HodJobDigest d join HodJob j on 
(d.HodID = j.HodID) join MRJob m on (m.HodID = j.HodID) where d.Timestamp >= 
'[past_hour]' and d.Timestamp <= '[now]' and d.Timestamp >= m.LAUNCH_TIME and 
d.Timestamp <= m.FINISH_TIME group by m.MRJobID) as foo group by m; 
-#insert into [c] select if(AvgCPUBusy is null,0,AvgCPUBusy) as m, CASE WHEN 
MRJobName like 'PigLatin%' THEN 'Pig' WHEN MRJobName like 'streamjob%' THEN 
'Streaming' WHEN MRJobName like '%abacus%' THEN 'Abacus' ELSE 'Other' END as 
interface, count(*)*j.NumOfMachines/60 as nodehours,count(distinct(MRJobID)) as 
jobs from HodJobDigest d join HodJob j on (d.HodID = j.HodID) join MRJob m on 
(m.HodID = j.HodID) where d.Timestamp >= '[past_hour]' and d.Timestamp <= 
'[now]' and d.Timestamp >= m.LAUNCH_TIME and d.Timestamp <= m.FINISH_TIME group 
by AvgCPUBusy,CASE WHEN MRJobName like 'PigLatin%' THEN 'Pig' WHEN MRJobName 
like 'streamjob%' THEN 'Streaming' WHEN MRJobName like '%abacus%' THEN 'Abacus' 
ELSE 'Other' END order by if(AvgCPUBusy is null,0,AvgCPUBusy)
+#insert into [a] select d.Timestamp as starttime,((AvgCPUBusy * 
j.NumOfMachines) / (sum(j.NumOfMachines) * 1)) as used from Digest d join 
HodJob j on (d.HodID = j.HodID) where d.Timestamp >= '[past_10_minutes]' and 
d.Timestamp <= '[now]' group by d.Timestamp order by d.Timestamp 
+#insert into [b] select m, sum(foo.nodehours) as nodehours from (select 
m.MRJobID, round(avg(if(AvgCPUBusy is null,0,AvgCPUBusy)),0) as m, 
count(*)*j.NumOfMachines/60 as nodehours from HodJobDigest d join HodJob j on 
(d.HodID = j.HodID) join MRJob m on (m.HodID = j.HodID) where d.Timestamp >= 
'[past_10_minutes]' and d.Timestamp <= '[now]' and d.Timestamp >= m.LAUNCH_TIME 
and d.Timestamp <= m.FINISH_TIME group by m.MRJobID) as foo group by m; 
+#insert into [c] select if(AvgCPUBusy is null,0,AvgCPUBusy) as m, CASE WHEN 
MRJobName like 'PigLatin%' THEN 'Pig' WHEN MRJobName like 'streamjob%' THEN 
'Streaming' WHEN MRJobName like '%abacus%' THEN 'Abacus' ELSE 'Other' END as 
interface, count(*)*j.NumOfMachines/60 as nodehours,count(distinct(MRJobID)) as 
jobs from HodJobDigest d join HodJob j on (d.HodID = j.HodID) join MRJob m on 
(m.HodID = j.HodID) where d.Timestamp >= '[past_10_minutes]' and d.Timestamp <= 
'[now]' and d.Timestamp >= m.LAUNCH_TIME and d.Timestamp <= m.FINISH_TIME group 
by AvgCPUBusy,CASE WHEN MRJobName like 'PigLatin%' THEN 'Pig' WHEN MRJobName 
like 'streamjob%' THEN 'Streaming' WHEN MRJobName like '%abacus%' THEN 'Abacus' 
ELSE 'Other' END order by if(AvgCPUBusy is null,0,AvgCPUBusy)
+#insert into [cluster_hadoop_mapred] (select 
timestamp,[avg(hadoop_mapred_job)] from [hadoop_mapred_job] where timestamp 
between '[past_10_minutes]' and '[now]' group by timestamp);
+replace into [cluster_system_metrics] (select timestamp,[avg(system_metrics)] 
from [system_metrics] where timestamp between '[past_10_minutes]' and 
'[past_5_minutes]' group by timestamp);
+replace into [dfs_throughput] (select timestamp,[avg(dfs_datanode)] from 
[dfs_datanode] where timestamp between '[past_10_minutes]' and 
'[past_5_minutes]' group by timestamp);
+replace into [cluster_disk] (select 
a.timestamp,a.mount,a.used,a.available,a.used_percent from (select 
from_unixtime(unix_timestamp(timestamp)-unix_timestamp(timestamp)%60)as 
timestamp,mount,avg(used) as used,avg(available) as available,avg(used_percent) 
as used_percent from [disk] where timestamp between '[past_10_minutes]' and 
'[past_5_minutes]' group by timestamp,mount) as a group by a.timestamp, 
a.mount);
+replace delayed into [hod_job_digest] (select 
timestamp,d.hodid,d.userid,[avg(system_metrics)] from (select a.HodID,b.host as 
machine,a.userid,a.starttime,a.endtime from [HodJob] a join [hod_machine] b on 
(a.HodID = b.HodID) where endtime between '[past_10_minutes]' and 
'[past_5_minutes]') as d,[system_metrics] where timestamp between d.starttime 
and d.endtime and host=d.machine group by hodid,timestamp);
+replace into [cluster_hadoop_rpc] (select timestamp,[avg(hadoop_rpc)] from 
[hadoop_rpc] where timestamp between '[past_10_minutes]' and '[past_5_minutes]' 
group by timestamp);
+replace into [user_util] (select timestamp, j.UserID as user, 
sum(j.NumOfMachines) as node_total, sum(cpu_idle_pcnt*j.NumOfMachines) as 
cpu_unused, sum((cpu_user_pcnt+cpu_system_pcnt)*j.NumOfMachines) as cpu_used, 
avg(cpu_user_pcnt+cpu_system_pcnt) as cpu_used_pcnt, 
sum((100-(sda_busy_pcnt+sdb_busy_pcnt+sdc_busy_pcnt+sdd_busy_pcnt)/4)*j.NumOfMachines)
 as disk_unused, 
sum(((sda_busy_pcnt+sdb_busy_pcnt+sdc_busy_pcnt+sdd_busy_pcnt)/4)*j.NumOfMachines)
 as disk_used, 
avg(((sda_busy_pcnt+sdb_busy_pcnt+sdc_busy_pcnt+sdd_busy_pcnt)/4)) as 
disk_used_pcnt, sum((100-eth0_busy_pcnt)*j.NumOfMachines) as network_unused, 
sum(eth0_busy_pcnt*j.NumOfMachines) as network_used, avg(eth0_busy_pcnt) as 
network_used_pcnt, sum((100-mem_used_pcnt)*j.NumOfMachines) as memory_unused, 
sum(mem_used_pcnt*j.NumOfMachines) as memory_used, avg(mem_used_pcnt) as 
memory_used_pcnt from [hod_job_digest] d,[HodJob] j where (d.HodID = j.HodID) 
and Timestamp between '[past_10_minutes]' and '[past_5_minutes]' group
  by j.UserID);
+#
+# Down sample metrics for charts
+replace into [system_metrics_month] (select 
timestamp,[group_avg(system_metrics)] from [system_metrics_week] where 
timestamp between '[past_15_minutes]' and '[now]' group by 
FLOOR(UNIX_TIMESTAMP(Timestamp)/300),host);
+replace into [system_metrics_quarter] (select 
timestamp,[group_avg(system_metrics)] from [system_metrics_month] where 
timestamp between '[past_90_minutes]' and '[now]' group by 
FLOOR(UNIX_TIMESTAMP(Timestamp)/1800),host);
+replace into [system_metrics_year] (select 
timestamp,[group_avg(system_metrics)] from [system_metrics_quarter] where 
timestamp between '[past_540_minutes]' and '[now]' group by 
FLOOR(UNIX_TIMESTAMP(Timestamp)/10800),host);
+replace into [system_metrics_decade] (select 
timestamp,[group_avg(system_metrics)] from [system_metrics_year] where 
timestamp between '[past_2160_minutes]' and '[now]' group by 
FLOOR(UNIX_TIMESTAMP(Timestamp)/43200),host);
+#
+replace into [dfs_namenode_month] (select timestamp,[group_avg(dfs_namenode)] 
from [dfs_namenode_week] where timestamp between '[past_15_minutes]' and 
'[now]' group by FLOOR(UNIX_TIMESTAMP(Timestamp)/300),host);
+replace into [dfs_namenode_quarter] (select 
timestamp,[group_avg(dfs_namenode)] from [dfs_namenode_month] where timestamp 
between '[past_90_minutes]' and '[now]' group by 
FLOOR(UNIX_TIMESTAMP(Timestamp)/1800),host);
+replace into [dfs_namenode_year] (select timestamp,[group_avg(dfs_namenode)] 
from [dfs_namenode_quarter] where timestamp between '[past_540_minutes]' and 
'[now]' group by FLOOR(UNIX_TIMESTAMP(Timestamp)/10800),host);
+replace into [dfs_namenode_decade] (select timestamp,[group_avg(dfs_namenode)] 
from [dfs_namenode_year] where timestamp between '[past_2160_minutes]' and 
'[now]' group by FLOOR(UNIX_TIMESTAMP(Timestamp)/43200),host);
+#
+replace into [dfs_datanode_month] (select timestamp,[group_avg(dfs_datanode)] 
from [dfs_datanode_week] where timestamp between '[past_15_minutes]' and 
'[now]' group by FLOOR(UNIX_TIMESTAMP(Timestamp)/300),host);
+replace into [dfs_datanode_quarter] (select 
timestamp,[group_avg(dfs_datanode)] from [dfs_datanode_month] where timestamp 
between '[past_90_minutes]' and '[now]' group by 
FLOOR(UNIX_TIMESTAMP(Timestamp)/1800),host);
+replace into [dfs_datanode_year] (select timestamp,[group_avg(dfs_datanode)] 
from [dfs_datanode_quarter] where timestamp between '[past_540_minutes]' and 
'[now]' group by FLOOR(UNIX_TIMESTAMP(Timestamp)/10800),host);
+replace into [dfs_datanode_decade] (select timestamp,[group_avg(dfs_datanode)] 
from [dfs_datanode_year] where timestamp between '[past_2160_minutes]' and 
'[now]' group by FLOOR(UNIX_TIMESTAMP(Timestamp)/43200),host);
+#
+replace into [hadoop_rpc_month] (select timestamp,[group_avg(hadoop_rpc)] from 
[hadoop_rpc_week] where timestamp between '[past_15_minutes]' and '[now]' group 
by FLOOR(UNIX_TIMESTAMP(Timestamp)/300),host);
+replace into [hadoop_rpc_quarter] (select timestamp,[group_avg(hadoop_rpc)] 
from [hadoop_rpc_month] where timestamp between '[past_90_minutes]' and '[now]' 
group by FLOOR(UNIX_TIMESTAMP(Timestamp)/1800),host);
+replace into [hadoop_rpc_year] (select timestamp,[group_avg(hadoop_rpc)] from 
[hadoop_rpc_quarter] where timestamp between '[past_540_minutes]' and '[now]' 
group by FLOOR(UNIX_TIMESTAMP(Timestamp)/10800),host);
+replace into [hadoop_rpc_decade] (select timestamp,[group_avg(hadoop_rpc)] 
from [hadoop_rpc_year] where timestamp between '[past_2160_minutes]' and 
'[now]' group by FLOOR(UNIX_TIMESTAMP(Timestamp)/43200),host);
+#
+replace into [cluster_hadoop_rpc_month] (select 
timestamp,[avg(cluster_hadoop_rpc)] from [cluster_hadoop_rpc_week] where 
timestamp between '[past_15_minutes]' and '[now]' group by 
FLOOR(UNIX_TIMESTAMP(Timestamp)/300));
+replace into [cluster_hadoop_rpc_quarter] (select 
timestamp,[avg(cluster_hadoop_rpc)] from [cluster_hadoop_rpc_month] where 
timestamp between '[past_90_minutes]' and '[now]' group by 
FLOOR(UNIX_TIMESTAMP(Timestamp)/1800));
+replace into [cluster_hadoop_rpc_year] (select 
timestamp,[avg(cluster_hadoop_rpc)] from [cluster_hadoop_rpc_quarter] where 
timestamp between '[past_540_minutes]' and '[now]' group by 
FLOOR(UNIX_TIMESTAMP(Timestamp)/10800));
+replace into [cluster_hadoop_rpc_decade] (select 
timestamp,[avg(cluster_hadoop_rpc)] from [cluster_hadoop_rpc_year] where 
timestamp between '[past_2160_minutes]' and '[now]' group by 
FLOOR(UNIX_TIMESTAMP(Timestamp)/43200));
+#
+replace into [hadoop_mapred_month] (select 
timestamp,[group_avg(hadoop_mapred)] from [hadoop_mapred_week] where timestamp 
between '[past_15_minutes]' and '[now]' group by 
FLOOR(UNIX_TIMESTAMP(Timestamp)/300),host);
+replace into [hadoop_mapred_quarter] (select 
timestamp,[group_avg(hadoop_mapred)] from [hadoop_mapred_month] where timestamp 
between '[past_90_minutes]' and '[now]' group by 
FLOOR(UNIX_TIMESTAMP(Timestamp)/1800),host);
+replace into [hadoop_mapred_year] (select timestamp,[group_avg(hadoop_mapred)] 
from [hadoop_mapred_quarter] where timestamp between '[past_540_minutes]' and 
'[now]' group by FLOOR(UNIX_TIMESTAMP(Timestamp)/10800),host);
+replace into [hadoop_mapred_decade] (select 
timestamp,[group_avg(hadoop_mapred)] from [hadoop_mapred_year] where timestamp 
between '[past_2160_minutes]' and '[now]' group by 
FLOOR(UNIX_TIMESTAMP(Timestamp)/43200),host);
+#
+replace into [hadoop_jvm_month] (select timestamp,[group_avg(hadoop_jvm)] from 
[hadoop_jvm_week] where timestamp between '[past_15_minutes]' and '[now]' group 
by FLOOR(UNIX_TIMESTAMP(Timestamp)/300),host,process_name);
+replace into [hadoop_jvm_quarter] (select timestamp,[group_avg(hadoop_jvm)] 
from [hadoop_jvm_month] where timestamp between '[past_90_minutes]' and '[now]' 
group by FLOOR(UNIX_TIMESTAMP(Timestamp)/1800),host,process_name);
+replace into [hadoop_jvm_year] (select timestamp,[group_avg(hadoop_jvm)] from 
[hadoop_jvm_quarter] where timestamp between '[past_540_minutes]' and '[now]' 
group by FLOOR(UNIX_TIMESTAMP(Timestamp)/10800),host,process_name);
+replace into [hadoop_jvm_decade] (select timestamp,[group_avg(hadoop_jvm)] 
from [hadoop_jvm_year] where timestamp between '[past_2160_minutes]' and 
'[now]' group by FLOOR(UNIX_TIMESTAMP(Timestamp)/43200),host,process_name);
+#
+replace into [dfs_throughput_month] (select timestamp,[avg(dfs_throughput)] 
from [dfs_throughput_week] where timestamp between '[past_15_minutes]' and 
'[now]' group by FLOOR(UNIX_TIMESTAMP(Timestamp)/300));
+replace into [dfs_throughput_quarter] (select timestamp,[avg(dfs_throughput)] 
from [dfs_throughput_month] where timestamp between '[past_90_minutes]' and 
'[now]' group by FLOOR(UNIX_TIMESTAMP(Timestamp)/1800));
+replace into [dfs_throughput_year] (select timestamp,[avg(dfs_throughput)] 
from [dfs_throughput_quarter] where timestamp between '[past_540_minutes]' and 
'[now]' group by FLOOR(UNIX_TIMESTAMP(Timestamp)/10800));
+replace into [dfs_throughput_decade] (select timestamp,[avg(dfs_throughput)] 
from [dfs_throughput_year] where timestamp between '[past_2160_minutes]' and 
'[now]' group by FLOOR(UNIX_TIMESTAMP(Timestamp)/43200));
+#
+replace into [node_activity_month] (select timestamp,[avg(node_activity)] from 
[node_activity_week] where timestamp between '[past_15_minutes]' and '[now]' 
group by FLOOR(UNIX_TIMESTAMP(Timestamp)/300));
+replace into [node_activity_quarter] (select timestamp,[avg(node_activity)] 
from [node_activity_month] where timestamp between '[past_90_minutes]' and 
'[now]' group by FLOOR(UNIX_TIMESTAMP(Timestamp)/1800));
+replace into [node_activity_year] (select timestamp,[avg(node_activity)] from 
[node_activity_quarter] where timestamp between '[past_540_minutes]' and 
'[now]' group by FLOOR(UNIX_TIMESTAMP(Timestamp)/10800));
+replace into [node_activity_decade] (select timestamp,[avg(node_activity)] 
from [node_activity_year] where timestamp between '[past_2160_minutes]' and 
'[now]' group by FLOOR(UNIX_TIMESTAMP(Timestamp)/43200));
+#
+replace into [dfs_fsnamesystem_month] (select 
timestamp,[group_avg(dfs_fsnamesystem)] from [dfs_fsnamesystem_week] where 
timestamp between '[past_15_minutes]' and '[now]' group by 
FLOOR(UNIX_TIMESTAMP(Timestamp)/300),host);
+replace into [dfs_fsnamesystem_quarter] (select 
timestamp,[group_avg(dfs_fsnamesystem)] from [dfs_fsnamesystem_month] where 
timestamp between '[past_90_minutes]' and '[now]' group by 
FLOOR(UNIX_TIMESTAMP(Timestamp)/1800),host);
+replace into [dfs_fsnamesystem_year] (select 
timestamp,[group_avg(dfs_fsnamesystem)] from [dfs_fsnamesystem_quarter] where 
timestamp between '[past_540_minutes]' and '[now]' group by 
FLOOR(UNIX_TIMESTAMP(Timestamp)/10800),host);
+replace into [dfs_fsnamesystem_decade] (select 
timestamp,[group_avg(dfs_fsnamesystem)] from [dfs_fsnamesystem_year] where 
timestamp between '[past_2160_minutes]' and '[now]' group by 
FLOOR(UNIX_TIMESTAMP(Timestamp)/43200),host);
+#
+replace into [disk_month] (select timestamp,[group_avg(disk)] from [disk_week] 
where timestamp between '[past_15_minutes]' and '[now]' group by 
FLOOR(UNIX_TIMESTAMP(Timestamp)/300),host,mount);
+replace into [disk_quarter] (select timestamp,[group_avg(disk)] from 
[disk_month] where timestamp between '[past_90_minutes]' and '[now]' group by 
FLOOR(UNIX_TIMESTAMP(Timestamp)/1800),host,mount);
+replace into [disk_year] (select timestamp,[group_avg(disk)] from 
[disk_quarter] where timestamp between '[past_540_minutes]' and '[now]' group 
by FLOOR(UNIX_TIMESTAMP(Timestamp)/10800),host,mount);
+replace into [disk_decade] (select timestamp,[group_avg(disk)] from 
[disk_year] where timestamp between '[past_2160_minutes]' and '[now]' group by 
FLOOR(UNIX_TIMESTAMP(Timestamp)/43200),host,mount);
+#
+replace into [cluster_disk_month] (select timestamp,[group_avg(cluster_disk)] 
from [cluster_disk_week] where timestamp between '[past_15_minutes]' and 
'[now]' group by FLOOR(UNIX_TIMESTAMP(Timestamp)/300),mount);
+replace into [cluster_disk_quarter] (select 
timestamp,[group_avg(cluster_disk)] from [cluster_disk_month] where timestamp 
between '[past_90_minutes]' and '[now]' group by 
FLOOR(UNIX_TIMESTAMP(Timestamp)/1800),mount);
+replace into [cluster_disk_year] (select timestamp,[group_avg(cluster_disk)] 
from [cluster_disk_quarter] where timestamp between '[past_540_minutes]' and 
'[now]' group by FLOOR(UNIX_TIMESTAMP(Timestamp)/10800),mount);
+replace into [cluster_disk_decade] (select timestamp,[group_avg(cluster_disk)] 
from [cluster_disk_year] where timestamp between '[past_2160_minutes]' and 
'[now]' group by FLOOR(UNIX_TIMESTAMP(Timestamp)/43200),mount);
+#
+replace into [cluster_system_metrics_month] (select 
timestamp,[avg(cluster_system_metrics)] from [cluster_system_metrics_week] 
where timestamp between '[past_15_minutes]' and '[now]' group by 
FLOOR(UNIX_TIMESTAMP(Timestamp)/300));
+replace into [cluster_system_metrics_quarter] (select 
timestamp,[avg(cluster_system_metrics)] from [cluster_system_metrics_month] 
where timestamp between '[past_90_minutes]' and '[now]' group by 
FLOOR(UNIX_TIMESTAMP(Timestamp)/1800));
+replace into [cluster_system_metrics_year] (select 
timestamp,[avg(cluster_system_metrics)] from [cluster_system_metrics_quarter] 
where timestamp between '[past_540_minutes]' and '[now]' group by 
FLOOR(UNIX_TIMESTAMP(Timestamp)/10800));
+replace into [cluster_system_metrics_decade] (select 
timestamp,[avg(cluster_system_metrics)] from [cluster_system_metrics_year] 
where timestamp between '[past_2160_minutes]' and '[now]' group by 
FLOOR(UNIX_TIMESTAMP(Timestamp)/43200));
+#
+replace into [hod_job_digest_month] (select 
timestamp,[group_avg(hod_job_digest)] from [hod_job_digest_week] where 
timestamp between '[past_15_minutes]' and '[now]' group by 
FLOOR(UNIX_TIMESTAMP(Timestamp)/300),HodID);
+replace into [hod_job_digest_quarter] (select 
timestamp,[group_avg(hod_job_digest)] from [hod_job_digest_month] where 
timestamp between '[past_90_minutes]' and '[now]' group by 
FLOOR(UNIX_TIMESTAMP(Timestamp)/1800),HodID);
+replace into [hod_job_digest_year] (select 
timestamp,[group_avg(hod_job_digest)] from [hod_job_digest_quarter] where 
timestamp between '[past_540_minutes]' and '[now]' group by 
FLOOR(UNIX_TIMESTAMP(Timestamp)/10800),HodID);
+replace into [hod_job_digest_decade] (select 
timestamp,[group_avg(hod_job_digest)] from [hod_job_digest_year] where 
timestamp between '[past_2160_minutes]' and '[now]' group by 
FLOOR(UNIX_TIMESTAMP(Timestamp)/43200),HodID);
+#
+replace into [user_util_month] (select timestamp,[group_avg(user_util)] from 
[user_util_week] where timestamp between '[past_15_minutes]' and '[now]' group 
by FLOOR(UNIX_TIMESTAMP(Timestamp)/300),user);
+replace into [user_util_quarter] (select timestamp,[group_avg(user_util)] from 
[user_util_month] where timestamp between '[past_90_minutes]' and '[now]' group 
by FLOOR(UNIX_TIMESTAMP(Timestamp)/1800),user);
+replace into [user_util_year] (select timestamp,[group_avg(user_util)] from 
[user_util_quarter] where timestamp between '[past_540_minutes]' and '[now]' 
group by FLOOR(UNIX_TIMESTAMP(Timestamp)/10800),user);
+replace into [user_util_decade] (select timestamp,[group_avg(user_util)] from 
[user_util_year] where timestamp between '[past_2160_minutes]' and '[now]' 
group by FLOOR(UNIX_TIMESTAMP(Timestamp)/43200),user);

Modified: hadoop/core/trunk/src/contrib/chukwa/conf/database_create_tables
URL: 
http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/conf/database_create_tables?rev=728206&r1=728205&r2=728206&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/conf/database_create_tables (original)
+++ hadoop/core/trunk/src/contrib/chukwa/conf/database_create_tables Fri Dec 19 
17:11:42 2008
@@ -160,7 +160,7 @@
     sdc_busy_pcnt float,
     sdd_busy_pcnt float,
     swap_used_pcnt float,
-    primary key(host, timestamp),
+    primary key(timestamp),
     index (timestamp)
 );
 
@@ -571,7 +571,7 @@
     sdd_busy_pcnt float,
     swap_used_pcnt float,
     primary key(HodId, timestamp),
-    index(timeStamp)
+    index(timestamp)
 ); 
 
 create table if not exists user_util_template (
@@ -589,7 +589,9 @@
     network_used_pcnt float,
     memory_unused double,
     memory_used double,
-    memory_used_pcnt float
+    memory_used_pcnt float,
+    primary key(user, timestamp),
+    index(timestamp)
 );
 
 create table if not exists QueueInfo(
@@ -598,5 +600,5 @@
     Queue VARCHAR(20),
     NumOfMachine smallint unsigned,
     status varchar(1),
-    index(TimeStamp)
+    index(Timestamp)
 );

Modified: 
hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/database/Aggregator.java
URL: 
http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/database/Aggregator.java?rev=728206&r1=728205&r2=728206&view=diff
==============================================================================
--- 
hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/database/Aggregator.java
 (original)
+++ 
hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/database/Aggregator.java
 Fri Dec 19 17:11:42 2008
@@ -25,30 +25,37 @@
 import java.sql.ResultSet;
 import java.sql.ResultSetMetaData;
 import java.sql.SQLException;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
 import java.util.Calendar;
 import java.util.HashMap;
 import java.util.Iterator;
-
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.sql.DatabaseMetaData;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.chukwa.inputtools.mdl.DataConfig;
 import org.apache.hadoop.chukwa.util.DatabaseWriter;
+import org.apache.hadoop.chukwa.util.ExceptionUtil;
 import org.apache.hadoop.chukwa.util.PidFile;
 
 public class Aggregator {
        private static DatabaseConfig dbc = null;
 
-       private static Log log = LogFactory.getLog(Consolidator.class);
+       private static Log log = LogFactory.getLog(Aggregator.class);
+       private String table = null;
+       private String jdbc = null;
+       private int[] intervals;
        private long current = 0;
-    private static PidFile loader=null;
-
-       public Aggregator() {
-
+    private static DatabaseWriter db = null;
+    public Aggregator() {
                dbc = new DatabaseConfig();
                Calendar now = Calendar.getInstance();
                current = now.getTimeInMillis();
        }
 
-       public HashMap<String,String> findMacros(String query) {
+       public HashMap<String,String> findMacros(String query) throws 
SQLException {
                boolean add=false;
                HashMap<String,String> macroList = new HashMap<String,String>();
                String macro="";
@@ -71,54 +78,131 @@
            return macroList;
        }
 
-       public String computeMacro(String macro) {
-               if(macro.indexOf("avg(")==0) {
+       public String computeMacro(String macro) throws SQLException {
+               Pattern p = Pattern.compile("past_(.*)_minutes");
+               Matcher matcher = p.matcher(macro);
+               if(macro.indexOf("avg(")==0 || macro.indexOf("group_avg(")==0) {
                        String meta="";
-                       String[] table = 
dbc.findTableName(macro.substring(4,macro.indexOf(")")), current, current);
+                       String[] table = 
dbc.findTableName(macro.substring(macro.indexOf("(")+1,macro.indexOf(")")), 
current, current);
                        try {
                                String cluster = System.getProperty("CLUSTER");
                                if(cluster==null) {
                                        cluster="unknown";
                                }
-                               DatabaseWriter db = new DatabaseWriter(cluster);
-
-                           String query = "select * from "+table[0]+" order by 
timestamp desc limit 1";
-                   log.debug("Query: "+query);
-                   ResultSet rs = db.query(query);
-                   if(rs==null) {
-                           throw new SQLException("Table is undefined.");
+                DatabaseMetaData dbMetaData = db.getConnection().getMetaData();
+                   ResultSet rs = dbMetaData.getColumns ( null,null,table[0], 
null);
+                   boolean first=true;
+                   while(rs.next()) {
+                       if(!first) {
+                               meta = meta+",";
+                       }
+                       String name = rs.getString(4);
+                       int type = rs.getInt(5);
+                       if(type==java.sql.Types.VARCHAR) {
+                               if(macro.indexOf("group_avg(")<0) {
+                                       meta=meta+"count("+name+") as "+name;
+                               } else {
+                                       meta=meta+name;
+                               }
+                               first=false;
+                       } else if(type==java.sql.Types.DOUBLE ||
+                                         type==java.sql.Types.FLOAT ||
+                                         type==java.sql.Types.INTEGER) {
+                               meta=meta+"avg("+name+")";
+                               first=false;
+                       } else if(type==java.sql.Types.TIMESTAMP) {
+                               // Skip the column
+                       } else {
+                               meta=meta+"AVG("+name+")";
+                               first=false;
+                       }
                    }
-                   ResultSetMetaData rmeta = rs.getMetaData();
-                   if(rs.next()) {
-                       boolean first=true;
-                       for(int i=1;i<=rmeta.getColumnCount();i++) {
-                               if(!first) {
-                                       meta=meta+",";
-                               }
-                               
if(rmeta.getColumnType(i)==java.sql.Types.VARCHAR) {
-                                       
meta=meta+"count("+rmeta.getColumnName(i)+") as "+rmeta.getColumnName(i);
-                                       first=false;
-                               } else 
if(rmeta.getColumnType(i)==java.sql.Types.DOUBLE || 
-                                                 
rmeta.getColumnType(i)==java.sql.Types.INTEGER || 
-                                                 
rmeta.getColumnType(i)==java.sql.Types.FLOAT) {
-                                       
meta=meta+"avg("+rmeta.getColumnName(i)+")";
-                                       first=false;
-                               } else 
if(rmeta.getColumnType(i)==java.sql.Types.TIMESTAMP) {
-                                       // Skip the column
-                               } else {
-                                       
meta=meta+"avg("+rmeta.getColumnName(i)+")";
-                                       first=false;                            
        
-                               }
-                           }
+                   if(first) {
+                           throw new SQLException("Table is undefined.");
                    }
                        } catch(SQLException ex) {
-                               log.error(ex);
+                               throw new SQLException("Table does not exist:"+ 
table[0]);
                        }
                        return meta;
                } else if(macro.indexOf("now")==0) {
+                       SimpleDateFormat sdf = new SimpleDateFormat();
                        return DatabaseWriter.formatTimeStamp(current);
+               } else if(matcher.find()) {
+                       int period = Integer.parseInt(matcher.group(1));
+                       long timestamp = current - (current % 
(period*60*1000L)) - (period*60*1000L);
+                       return DatabaseWriter.formatTimeStamp(timestamp);
                } else if(macro.indexOf("past_hour")==0) {
                        return 
DatabaseWriter.formatTimeStamp(current-3600*1000L);
+               } else if(macro.endsWith("_week")) {
+                       long partition = current / DatabaseConfig.WEEK;
+                       if(partition<=0) {
+                               partition=1;
+                       }
+                       String[] buffers = macro.split("_");
+                       StringBuffer tableName = new StringBuffer();
+                       for(int i=0;i<buffers.length-1;i++) {
+                               tableName.append(buffers[i]);
+                               tableName.append("_");
+                       }
+                       tableName.append(partition);
+                       tableName.append("_week");
+                       return tableName.toString();
+               } else if(macro.endsWith("_month")) {
+                       long partition = current / DatabaseConfig.MONTH;
+                       if(partition<=0) {
+                               partition=1;
+                       }
+                       String[] buffers = macro.split("_");
+                       StringBuffer tableName = new StringBuffer();
+                       for(int i=0;i<buffers.length-1;i++) {
+                               tableName.append(buffers[i]);
+                               tableName.append("_");
+                       }
+                       tableName.append(partition);
+                       tableName.append("_month");
+                       return tableName.toString();
+               } else if(macro.endsWith("_quarter")) {
+                       long partition = current / DatabaseConfig.QUARTER;
+                       if(partition<=0) {
+                               partition=1;
+                       }
+                       String[] buffers = macro.split("_");
+                       StringBuffer tableName = new StringBuffer();
+                       for(int i=0;i<buffers.length-1;i++) {
+                               tableName.append(buffers[i]);
+                               tableName.append("_");
+                       }
+                       tableName.append(partition);
+                       tableName.append("_quarter");
+                       return tableName.toString();
+               } else if(macro.endsWith("_year")) {
+                       long partition = current / DatabaseConfig.YEAR;
+                       if(partition<=0) {
+                               partition=1;
+                       }
+                       String[] buffers = macro.split("_");
+                       StringBuffer tableName = new StringBuffer();
+                       for(int i=0;i<buffers.length-1;i++) {
+                               tableName.append(buffers[i]);
+                               tableName.append("_");
+                       }
+                       tableName.append(partition);
+                       tableName.append("_year");
+                       return tableName.toString();
+               } else if(macro.endsWith("_decade")) {
+                       long partition = current / DatabaseConfig.DECADE;
+                       if(partition<=0) {
+                               partition=1;
+                       }
+                       String[] buffers = macro.split("_");
+                       StringBuffer tableName = new StringBuffer();
+                       for(int i=0;i<buffers.length-1;i++) {
+                               tableName.append(buffers[i]);
+                               tableName.append("_");
+                       }
+                       tableName.append(partition);
+                       tableName.append("_decade");
+                       return tableName.toString();
                }
                String[] tableList = dbc.findTableName(macro,current,current);
                return tableList[0];
@@ -143,62 +227,51 @@
         return contents.toString();
     }
 
-       public void process(String table, String query) {
+       public void process(String query) {
                ResultSet rs = null;
+               String[] columns;
+               int[] columnsType;
+        String groupBy = "";
            long start = current;
            long end = current;
         
-               String cluster = System.getProperty("CLUSTER");
-               if(cluster==null) {
-                       cluster="unknown";
+
+               try {
+            HashMap<String, String> macroList = findMacros(query);
+            Iterator<String> macroKeys = macroList.keySet().iterator();
+            while(macroKeys.hasNext()) {
+                   String mkey = macroKeys.next();
+                   log.debug("replacing:"+mkey+" with "+macroList.get(mkey));
+                   query = query.replace("["+mkey+"]", macroList.get(mkey));
+            }
+            db.execute(query);
+               } catch(SQLException e) {
+                   log.error(query);
+                       log.error(e.getMessage());
                }
-           DatabaseWriter db = new DatabaseWriter(cluster);
-                           // Find the last aggregated value from table
-                           String[] tmpList = 
dbc.findTableName(table,start,end);
-                           String timeTest = "select timestamp from 
"+tmpList[0]+" order by timestamp desc limit 1";
-                           try {
-                                       rs = db.query(timeTest);
-                                   while(rs.next()) {
-                                       start=rs.getTimestamp(1).getTime();
-                                       end=start;
-                                   }
-                           } catch (SQLException e) {
-                                       // TODO Auto-generated catch block
-                                       e.printStackTrace();
-                               }
-                           // Transform table names
-                HashMap<String, String> macroList = findMacros(query);
-                Iterator<String> macroKeys = macroList.keySet().iterator();
-                while(macroKeys.hasNext()) {
-                       String mkey = macroKeys.next();
-                       log.debug("replacing:"+mkey+" with 
"+macroList.get(mkey));
-                               query = query.replace("["+mkey+"]", 
macroList.get(mkey));
-                }
-                               log.info(query);
-                db.execute(query);
-            db.close();
        }
 
     public static void main(String[] args) {
-        loader=new PidFile(System.getProperty("CLUSTER")+"Aggregator");
-       dbc = new DatabaseConfig();     
+        log.info("Aggregator started.");
+       dbc = new DatabaseConfig();
+               String cluster = System.getProperty("CLUSTER");
+               if(cluster==null) {
+                       cluster="unknown";
+               }
+       db = new DatabaseWriter(cluster);
        String queries = Aggregator.getContents(new 
File(System.getenv("CHUKWA_CONF_DIR")+File.separator+"aggregator.sql"));
        String[] query = queries.split("\n");
        for(int i=0;i<query.length;i++) {
-                   int startOffset = query[i].indexOf("[")+1;
-                   int endOffset = query[i].indexOf("]");
                    if(query[i].equals("")) {
-                   } else if(startOffset==-1 || endOffset==-1) {
-                       log.error("Unable to extract table name from 
query:"+query[i]);
                    } else if(query[i].indexOf("#")==0) {
                        log.debug("skipping: "+query[i]);
                    } else {
-                       String table = query[i].substring(startOffset, 
endOffset);
                        Aggregator dba = new Aggregator();
-                       dba.process(table, query[i]);
+                       dba.process(query[i]);
                    }
         }
-        loader.clean();
+        db.close();
+       log.info("Aggregator finished.");
     }
 
 }

Modified: 
hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/util/DatabaseWriter.java
URL: 
http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/util/DatabaseWriter.java?rev=728206&r1=728205&r2=728206&view=diff
==============================================================================
--- 
hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/util/DatabaseWriter.java
 (original)
+++ 
hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/util/DatabaseWriter.java
 Fri Dec 19 17:11:42 2008
@@ -27,6 +27,7 @@
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.chukwa.inputtools.mdl.DataConfig;
 
 public class DatabaseWriter {
     private static Log log = LogFactory.getLog(DatabaseWriter.class);
@@ -35,26 +36,25 @@
     private ResultSet rs = null;
 
     public DatabaseWriter(String host, String user, String password) {
-       String jdbc_url = System.getenv("JDBC_URL_PREFIX")+host+"/";
-       
-               if(user!=null) {
+       DataConfig mdlConfig = new DataConfig();
+       String jdbc_url = "jdbc:mysql://"+host+"/";
+        if(user!=null) {
             jdbc_url = jdbc_url + "?user=" + user;
             if(password!=null) {
                 jdbc_url = jdbc_url + "&password=" + password;
             }
-               }
+        }
         try {
             // The newInstance() call is a work around for some
             // broken Java implementations
-            String jdbcDriver = System.getenv("JDBC_DRIVER");
-            Class.forName(jdbcDriver).newInstance();
+            Class.forName("com.mysql.jdbc.Driver").newInstance();
         } catch (Exception ex) {
             // handle the error
             log.error(ex,ex);
         }
         try {
             conn = DriverManager.getConnection(jdbc_url);
-            log.info("Initialized JDBC URL: "+jdbc_url);
+            log.debug("Initialized JDBC URL: "+jdbc_url);
         } catch (SQLException ex) {
             log.error(ex,ex);
         }
@@ -66,20 +66,43 @@
         try {
             // The newInstance() call is a work around for some
             // broken Java implementations
-               String jdbcDriver = System.getenv("JDBC_DRIVER");
-            Class.forName(jdbcDriver).newInstance();
+            Class.forName("com.mysql.jdbc.Driver").newInstance();
         } catch (Exception ex) {
             // handle the error
             log.error(ex,ex);
         }
         try {
             conn = DriverManager.getConnection(jdbc_url);
-            log.info("Initialized JDBC URL: "+jdbc_url);
+            log.debug("Initialized JDBC URL: "+jdbc_url);
         } catch (SQLException ex) {
             log.error(ex,ex);
         }
     }
     
+    public DatabaseWriter() {
+       DataConfig mdlConfig = new DataConfig();
+       String jdbc_url = 
"jdbc:mysql://"+mdlConfig.get("jdbc.host")+"/"+mdlConfig.get("jdbc.db");
+        if(mdlConfig.get("jdbc.user")!=null) {
+            jdbc_url = jdbc_url + "?user=" + mdlConfig.get("jdbc.user");
+            if(mdlConfig.get("jdbc.password")!=null) {
+                jdbc_url = jdbc_url + "&password=" + 
mdlConfig.get("jdbc.password");
+            }
+        }
+        try {
+            // The newInstance() call is a work around for some
+            // broken Java implementations
+            Class.forName("com.mysql.jdbc.Driver").newInstance();
+        } catch (Exception ex) {
+            // handle the error
+            log.error(ex,ex);
+        }
+        try {
+            conn = DriverManager.getConnection(jdbc_url);
+            log.debug("Initialized JDBC URL: "+jdbc_url);
+        } catch (SQLException ex) {
+            log.error(ex,ex);
+        }
+    }
     public void execute(String query) {
         try {
             stmt = conn.createStatement(); 
@@ -102,6 +125,9 @@
             }
         }
     }
+    public Connection getConnection() {
+       return conn;
+    }
     public ResultSet query(String query) throws SQLException {
         try {
             stmt = conn.createStatement(); 


Reply via email to