Author: cdouglas
Date: Fri Dec 19 17:11:42 2008
New Revision: 728206
URL: http://svn.apache.org/viewvc?rev=728206&view=rev
Log:
HADOOP-4827. Replace Consolidator with Aggregator macros in Chukwa. Contributed
by Eric Yang
Removed:
hadoop/core/trunk/src/contrib/chukwa/conf/mysql_create_tables
hadoop/core/trunk/src/contrib/chukwa/conf/mysql_upgrade_tables
Modified:
hadoop/core/trunk/CHANGES.txt
hadoop/core/trunk/src/contrib/chukwa/bin/dbAdmin.sh
hadoop/core/trunk/src/contrib/chukwa/conf/aggregator.sql
hadoop/core/trunk/src/contrib/chukwa/conf/database_create_tables
hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/database/Aggregator.java
hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/util/DatabaseWriter.java
Modified: hadoop/core/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=728206&r1=728205&r2=728206&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Fri Dec 19 17:11:42 2008
@@ -481,6 +481,9 @@
HADOOP-4849. Documentation for Service Level Authorization implemented in
HADOOP-4348. (acmurthy)
+ HADOOP-4827. Replace Consolidator with Aggregator macros in Chukwa (Eric
+ Yang via cdouglas)
+
Release 0.19.1 - Unreleased
IMPROVEMENTS
Modified: hadoop/core/trunk/src/contrib/chukwa/bin/dbAdmin.sh
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/bin/dbAdmin.sh?rev=728206&r1=728205&r2=728206&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/bin/dbAdmin.sh (original)
+++ hadoop/core/trunk/src/contrib/chukwa/bin/dbAdmin.sh Fri Dec 19 17:11:42 2008
@@ -37,18 +37,17 @@
cat ${CHUKWA_CONF_DIR}/jdbc.conf | \
while read LINE; do
CLUSTER=`echo ${LINE} | cut -f 1 -d'='`
- ${JAVA_HOME}/bin/java -DCLUSTER=${CLUSTER} ${JVM_OPTS}
org.apache.hadoop.chukwa.database.TableCreator ${EXP_DATE} 7
- ${JAVA_HOME}/bin/java -DCLUSTER=${CLUSTER} ${JVM_OPTS}
org.apache.hadoop.chukwa.database.TableCreator ${EXP_DATE} 30
- ${JAVA_HOME}/bin/java -DCLUSTER=${CLUSTER} ${JVM_OPTS}
org.apache.hadoop.chukwa.database.TableCreator ${EXP_DATE} 91
- ${JAVA_HOME}/bin/java -DCLUSTER=${CLUSTER} ${JVM_OPTS}
org.apache.hadoop.chukwa.database.TableCreator ${EXP_DATE} 365
- ${JAVA_HOME}/bin/java -DCLUSTER=${CLUSTER} ${JVM_OPTS}
org.apache.hadoop.chukwa.database.TableCreator ${EXP_DATE} 3650
- ${JAVA_HOME}/bin/java -DCLUSTER=${CLUSTER} ${JVM_OPTS}
org.apache.hadoop.chukwa.database.Aggregator
- ${JAVA_HOME}/bin/java -DCLUSTER=${CLUSTER} ${JVM_OPTS}
org.apache.hadoop.chukwa.database.Consolidator
- ${JAVA_HOME}/bin/java -DCLUSTER=${CLUSTER} ${JVM_OPTS}
org.apache.hadoop.chukwa.database.DataExpiration ${EXP_DATE} 7
- ${JAVA_HOME}/bin/java -DCLUSTER=${CLUSTER} ${JVM_OPTS}
org.apache.hadoop.chukwa.database.DataExpiration ${EXP_DATE} 30
- ${JAVA_HOME}/bin/java -DCLUSTER=${CLUSTER} ${JVM_OPTS}
org.apache.hadoop.chukwa.database.DataExpiration ${EXP_DATE} 91
- ${JAVA_HOME}/bin/java -DCLUSTER=${CLUSTER} ${JVM_OPTS}
org.apache.hadoop.chukwa.database.DataExpiration ${EXP_DATE} 365
- ${JAVA_HOME}/bin/java -DCLUSTER=${CLUSTER} ${JVM_OPTS}
org.apache.hadoop.chukwa.database.DataExpiration ${EXP_DATE} 3650
+ ${JAVA_HOME}/bin/java -DCLUSTER=${CLUSTER} ${JVM_OPTS}
org.apache.hadoop.chukwa.database.TableCreator ${EXP_DATE} 7 &
+ ${JAVA_HOME}/bin/java -DCLUSTER=${CLUSTER} ${JVM_OPTS}
org.apache.hadoop.chukwa.database.TableCreator ${EXP_DATE} 30 &
+ ${JAVA_HOME}/bin/java -DCLUSTER=${CLUSTER} ${JVM_OPTS}
org.apache.hadoop.chukwa.database.TableCreator ${EXP_DATE} 91 &
+ ${JAVA_HOME}/bin/java -DCLUSTER=${CLUSTER} ${JVM_OPTS}
org.apache.hadoop.chukwa.database.TableCreator ${EXP_DATE} 365 &
+ ${JAVA_HOME}/bin/java -DCLUSTER=${CLUSTER} ${JVM_OPTS}
org.apache.hadoop.chukwa.database.TableCreator ${EXP_DATE} 3650 &
+ ${JAVA_HOME}/bin/java -DCLUSTER=${CLUSTER} ${JVM_OPTS}
org.apache.hadoop.chukwa.database.Aggregator &
+ ${JAVA_HOME}/bin/java -DCLUSTER=${CLUSTER} ${JVM_OPTS}
org.apache.hadoop.chukwa.database.DataExpiration ${EXP_DATE} 7 &
+ ${JAVA_HOME}/bin/java -DCLUSTER=${CLUSTER} ${JVM_OPTS}
org.apache.hadoop.chukwa.database.DataExpiration ${EXP_DATE} 30 &
+ ${JAVA_HOME}/bin/java -DCLUSTER=${CLUSTER} ${JVM_OPTS}
org.apache.hadoop.chukwa.database.DataExpiration ${EXP_DATE} 91 &
+ ${JAVA_HOME}/bin/java -DCLUSTER=${CLUSTER} ${JVM_OPTS}
org.apache.hadoop.chukwa.database.DataExpiration ${EXP_DATE} 365 &
+ ${JAVA_HOME}/bin/java -DCLUSTER=${CLUSTER} ${JVM_OPTS}
org.apache.hadoop.chukwa.database.DataExpiration ${EXP_DATE} 3650 &
done
end=`date +%s`
duration=$(( $end - $start ))
Modified: hadoop/core/trunk/src/contrib/chukwa/conf/aggregator.sql
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/conf/aggregator.sql?rev=728206&r1=728205&r2=728206&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/conf/aggregator.sql (original)
+++ hadoop/core/trunk/src/contrib/chukwa/conf/aggregator.sql Fri Dec 19
17:11:42 2008
@@ -1,12 +1,88 @@
-insert into [cluster_system_metrics] (select timestamp,[avg(system_metrics)]
from [system_metrics] where timestamp between '[past_hour]' and '[now]' group
by timestamp);
-insert into [dfs_throughput] (select timestamp,[avg(dfs_datanode)] from
[dfs_datanode] where timestamp between '[past_hour]' and '[now]' group by
timestamp);
-insert into [cluster_disk] (select
a.timestamp,a.mount,a.used,a.available,a.used_percent from (select
from_unixtime(unix_timestamp(timestamp)-unix_timestamp(timestamp)%60)as
timestamp,mount,avg(used) as used,avg(available) as available,avg(used_percent)
as used_percent from [disk] where timestamp between '[past_hour]' and '[now]'
group by timestamp,mount) as a group by a.timestamp, a.mount);
-insert into [hod_job_digest] (select
timestamp,d.hodid,d.userid,[avg(system_metrics)] from (select a.HodID,b.host as
machine,a.userid,a.starttime,a.endtime from [HodJob] a join [hod_machine] b on
(a.HodID = b.HodID) where endtime between '[past_hour]' and '[now]') as
d,[system_metrics] where timestamp between d.starttime and d.endtime and
host=d.machine group by hodid,timestamp);
-insert into [cluster_hadoop_rpc] (select timestamp,[avg(hadoop_rpc)] from
[hadoop_rpc] where timestamp between '[past_hour]' and '[now]' group by
timestamp);
-#insert into [cluster_hadoop_mapred] (select
timestamp,[avg(hadoop_mapred_job)] from [hadoop_mapred_job] where timestamp
between '[past_hour]' and '[now]' group by timestamp);
-insert into [user_util] (select timestamp, j.UserID as user,
sum(j.NumOfMachines) as node_total, sum(cpu_idle_pcnt*j.NumOfMachines) as
cpu_unused, sum((cpu_user_pcnt+cpu_system_pcnt)*j.NumOfMachines) as cpu_used,
avg(cpu_user_pcnt+cpu_system_pcnt) as cpu_used_pcnt,
sum((100-(sda_busy_pcnt+sdb_busy_pcnt+sdc_busy_pcnt+sdd_busy_pcnt)/4)*j.NumOfMachines)
as disk_unused,
sum(((sda_busy_pcnt+sdb_busy_pcnt+sdc_busy_pcnt+sdd_busy_pcnt)/4)*j.NumOfMachines)
as disk_used,
avg(((sda_busy_pcnt+sdb_busy_pcnt+sdc_busy_pcnt+sdd_busy_pcnt)/4)) as
disk_used_pcnt, sum((100-eth0_busy_pcnt)*j.NumOfMachines) as network_unused,
sum(eth0_busy_pcnt*j.NumOfMachines) as network_used, avg(eth0_busy_pcnt) as
network_used_pcnt, sum((100-mem_used_pcnt)*j.NumOfMachines) as memory_unused,
sum(mem_used_pcnt*j.NumOfMachines) as memory_used, avg(mem_used_pcnt) as
memory_used_pcnt from [hod_job_digest] d,[HodJob] j where (d.HodID = j.HodID)
and Timestamp between '[past_hour]' and '[now]' group by j.UserID);
#insert into [node_util] select starttime, avg(unused) as unused, avg(used) as
used from (select DATE_FORMAT(m.LAUNCH_TIME,'%Y-%m-%d %H:%i:%s') as
starttime,sum(AvgCPUBusy*j.NumOfMachines/(60*100)) as
unused,sum((100-AvgCPUBusy)*j.NumOfMachines/(60*100)) as used from HodJobDigest
d join HodJob j on (d.HodID = j.HodID) join MRJob m on (m.HodID = j.HodID)
where m.LAUNCH_TIME >= '2008-09-12 21:11' and m.LAUNCH_TIME <= '2008-09-12
22:11' and d.Timestamp >= m.LAUNCH_TIME and d.Timestamp <= m.FINISH_TIME group
by m.MRJobID order by m.LAUNCH_TIME) as t group by t.starttime
#insert into [jobtype_util] select CASE WHEN MRJobName like 'PigLatin%' THEN
'Pig' WHEN MRJobName like 'streamjob%' THEN 'Streaming' WHEN MRJobName like
'%abacus%' THEN 'Abacus' ELSE 'Other' END as m, count(*)*j.NumOfMachines/60 as
nodehours,count(distinct(MRJobID)) as jobs from HodJobDigest d join HodJob j on
(d.HodID = j.HodID) join MRJob m on (m.HodID = j.HodID) where d.Timestamp >=
'2008-09-12 21:11' and d.Timestamp <= '2008-09-12 22:11' and d.Timestamp >=
m.LAUNCH_TIME and d.Timestamp <= m.FINISH_TIME group by CASE WHEN MRJobName
like 'PigLatin%' THEN 'Pig' WHEN MRJobName like 'streamjob%' THEN 'Streaming'
WHEN MRJobName like '%abacus%' THEN 'Abacus' ELSE 'Other' END order by CASE
WHEN MRJobName like 'PigLatin%' THEN 'Pig' WHEN MRJobName like 'streamjob%'
THEN 'Streaming' WHEN MRJobName like '%abacus%' THEN 'Abacus' ELSE 'Other' END
-#insert into [a] select d.Timestamp as starttime,((AvgCPUBusy *
j.NumOfMachines) / (sum(j.NumOfMachines) * 1)) as used from Digest d join
HodJob j on (d.HodID = j.HodID) where d.Timestamp >= '[past_hour]' and
d.Timestamp <= '[now]' group by d.Timestamp order by d.Timestamp
-#insert into [b] select m, sum(foo.nodehours) as nodehours from (select
m.MRJobID, round(avg(if(AvgCPUBusy is null,0,AvgCPUBusy)),0) as m,
count(*)*j.NumOfMachines/60 as nodehours from HodJobDigest d join HodJob j on
(d.HodID = j.HodID) join MRJob m on (m.HodID = j.HodID) where d.Timestamp >=
'[past_hour]' and d.Timestamp <= '[now]' and d.Timestamp >= m.LAUNCH_TIME and
d.Timestamp <= m.FINISH_TIME group by m.MRJobID) as foo group by m;
-#insert into [c] select if(AvgCPUBusy is null,0,AvgCPUBusy) as m, CASE WHEN
MRJobName like 'PigLatin%' THEN 'Pig' WHEN MRJobName like 'streamjob%' THEN
'Streaming' WHEN MRJobName like '%abacus%' THEN 'Abacus' ELSE 'Other' END as
interface, count(*)*j.NumOfMachines/60 as nodehours,count(distinct(MRJobID)) as
jobs from HodJobDigest d join HodJob j on (d.HodID = j.HodID) join MRJob m on
(m.HodID = j.HodID) where d.Timestamp >= '[past_hour]' and d.Timestamp <=
'[now]' and d.Timestamp >= m.LAUNCH_TIME and d.Timestamp <= m.FINISH_TIME group
by AvgCPUBusy,CASE WHEN MRJobName like 'PigLatin%' THEN 'Pig' WHEN MRJobName
like 'streamjob%' THEN 'Streaming' WHEN MRJobName like '%abacus%' THEN 'Abacus'
ELSE 'Other' END order by if(AvgCPUBusy is null,0,AvgCPUBusy)
+#insert into [a] select d.Timestamp as starttime,((AvgCPUBusy *
j.NumOfMachines) / (sum(j.NumOfMachines) * 1)) as used from Digest d join
HodJob j on (d.HodID = j.HodID) where d.Timestamp >= '[past_10_minutes]' and
d.Timestamp <= '[now]' group by d.Timestamp order by d.Timestamp
+#insert into [b] select m, sum(foo.nodehours) as nodehours from (select
m.MRJobID, round(avg(if(AvgCPUBusy is null,0,AvgCPUBusy)),0) as m,
count(*)*j.NumOfMachines/60 as nodehours from HodJobDigest d join HodJob j on
(d.HodID = j.HodID) join MRJob m on (m.HodID = j.HodID) where d.Timestamp >=
'[past_10_minutes]' and d.Timestamp <= '[now]' and d.Timestamp >= m.LAUNCH_TIME
and d.Timestamp <= m.FINISH_TIME group by m.MRJobID) as foo group by m;
+#insert into [c] select if(AvgCPUBusy is null,0,AvgCPUBusy) as m, CASE WHEN
MRJobName like 'PigLatin%' THEN 'Pig' WHEN MRJobName like 'streamjob%' THEN
'Streaming' WHEN MRJobName like '%abacus%' THEN 'Abacus' ELSE 'Other' END as
interface, count(*)*j.NumOfMachines/60 as nodehours,count(distinct(MRJobID)) as
jobs from HodJobDigest d join HodJob j on (d.HodID = j.HodID) join MRJob m on
(m.HodID = j.HodID) where d.Timestamp >= '[past_10_minutes]' and d.Timestamp <=
'[now]' and d.Timestamp >= m.LAUNCH_TIME and d.Timestamp <= m.FINISH_TIME group
by AvgCPUBusy,CASE WHEN MRJobName like 'PigLatin%' THEN 'Pig' WHEN MRJobName
like 'streamjob%' THEN 'Streaming' WHEN MRJobName like '%abacus%' THEN 'Abacus'
ELSE 'Other' END order by if(AvgCPUBusy is null,0,AvgCPUBusy)
+#insert into [cluster_hadoop_mapred] (select
timestamp,[avg(hadoop_mapred_job)] from [hadoop_mapred_job] where timestamp
between '[past_10_minutes]' and '[now]' group by timestamp);
+replace into [cluster_system_metrics] (select timestamp,[avg(system_metrics)]
from [system_metrics] where timestamp between '[past_10_minutes]' and
'[past_5_minutes]' group by timestamp);
+replace into [dfs_throughput] (select timestamp,[avg(dfs_datanode)] from
[dfs_datanode] where timestamp between '[past_10_minutes]' and
'[past_5_minutes]' group by timestamp);
+replace into [cluster_disk] (select
a.timestamp,a.mount,a.used,a.available,a.used_percent from (select
from_unixtime(unix_timestamp(timestamp)-unix_timestamp(timestamp)%60)as
timestamp,mount,avg(used) as used,avg(available) as available,avg(used_percent)
as used_percent from [disk] where timestamp between '[past_10_minutes]' and
'[past_5_minutes]' group by timestamp,mount) as a group by a.timestamp,
a.mount);
+replace delayed into [hod_job_digest] (select
timestamp,d.hodid,d.userid,[avg(system_metrics)] from (select a.HodID,b.host as
machine,a.userid,a.starttime,a.endtime from [HodJob] a join [hod_machine] b on
(a.HodID = b.HodID) where endtime between '[past_10_minutes]' and
'[past_5_minutes]') as d,[system_metrics] where timestamp between d.starttime
and d.endtime and host=d.machine group by hodid,timestamp);
+replace into [cluster_hadoop_rpc] (select timestamp,[avg(hadoop_rpc)] from
[hadoop_rpc] where timestamp between '[past_10_minutes]' and '[past_5_minutes]'
group by timestamp);
+replace into [user_util] (select timestamp, j.UserID as user,
sum(j.NumOfMachines) as node_total, sum(cpu_idle_pcnt*j.NumOfMachines) as
cpu_unused, sum((cpu_user_pcnt+cpu_system_pcnt)*j.NumOfMachines) as cpu_used,
avg(cpu_user_pcnt+cpu_system_pcnt) as cpu_used_pcnt,
sum((100-(sda_busy_pcnt+sdb_busy_pcnt+sdc_busy_pcnt+sdd_busy_pcnt)/4)*j.NumOfMachines)
as disk_unused,
sum(((sda_busy_pcnt+sdb_busy_pcnt+sdc_busy_pcnt+sdd_busy_pcnt)/4)*j.NumOfMachines)
as disk_used,
avg(((sda_busy_pcnt+sdb_busy_pcnt+sdc_busy_pcnt+sdd_busy_pcnt)/4)) as
disk_used_pcnt, sum((100-eth0_busy_pcnt)*j.NumOfMachines) as network_unused,
sum(eth0_busy_pcnt*j.NumOfMachines) as network_used, avg(eth0_busy_pcnt) as
network_used_pcnt, sum((100-mem_used_pcnt)*j.NumOfMachines) as memory_unused,
sum(mem_used_pcnt*j.NumOfMachines) as memory_used, avg(mem_used_pcnt) as
memory_used_pcnt from [hod_job_digest] d,[HodJob] j where (d.HodID = j.HodID)
and Timestamp between '[past_10_minutes]' and '[past_5_minutes]' group
by j.UserID);
+#
+# Down sample metrics for charts
+replace into [system_metrics_month] (select
timestamp,[group_avg(system_metrics)] from [system_metrics_week] where
timestamp between '[past_15_minutes]' and '[now]' group by
FLOOR(UNIX_TIMESTAMP(Timestamp)/300),host);
+replace into [system_metrics_quarter] (select
timestamp,[group_avg(system_metrics)] from [system_metrics_month] where
timestamp between '[past_90_minutes]' and '[now]' group by
FLOOR(UNIX_TIMESTAMP(Timestamp)/1800),host);
+replace into [system_metrics_year] (select
timestamp,[group_avg(system_metrics)] from [system_metrics_quarter] where
timestamp between '[past_540_minutes]' and '[now]' group by
FLOOR(UNIX_TIMESTAMP(Timestamp)/10800),host);
+replace into [system_metrics_decade] (select
timestamp,[group_avg(system_metrics)] from [system_metrics_year] where
timestamp between '[past_2160_minutes]' and '[now]' group by
FLOOR(UNIX_TIMESTAMP(Timestamp)/43200),host);
+#
+replace into [dfs_namenode_month] (select timestamp,[group_avg(dfs_namenode)]
from [dfs_namenode_week] where timestamp between '[past_15_minutes]' and
'[now]' group by FLOOR(UNIX_TIMESTAMP(Timestamp)/300),host);
+replace into [dfs_namenode_quarter] (select
timestamp,[group_avg(dfs_namenode)] from [dfs_namenode_month] where timestamp
between '[past_90_minutes]' and '[now]' group by
FLOOR(UNIX_TIMESTAMP(Timestamp)/1800),host);
+replace into [dfs_namenode_year] (select timestamp,[group_avg(dfs_namenode)]
from [dfs_namenode_quarter] where timestamp between '[past_540_minutes]' and
'[now]' group by FLOOR(UNIX_TIMESTAMP(Timestamp)/10800),host);
+replace into [dfs_namenode_decade] (select timestamp,[group_avg(dfs_namenode)]
from [dfs_namenode_year] where timestamp between '[past_2160_minutes]' and
'[now]' group by FLOOR(UNIX_TIMESTAMP(Timestamp)/43200),host);
+#
+replace into [dfs_datanode_month] (select timestamp,[group_avg(dfs_datanode)]
from [dfs_datanode_week] where timestamp between '[past_15_minutes]' and
'[now]' group by FLOOR(UNIX_TIMESTAMP(Timestamp)/300),host);
+replace into [dfs_datanode_quarter] (select
timestamp,[group_avg(dfs_datanode)] from [dfs_datanode_month] where timestamp
between '[past_90_minutes]' and '[now]' group by
FLOOR(UNIX_TIMESTAMP(Timestamp)/1800),host);
+replace into [dfs_datanode_year] (select timestamp,[group_avg(dfs_datanode)]
from [dfs_datanode_quarter] where timestamp between '[past_540_minutes]' and
'[now]' group by FLOOR(UNIX_TIMESTAMP(Timestamp)/10800),host);
+replace into [dfs_datanode_decade] (select timestamp,[group_avg(dfs_datanode)]
from [dfs_datanode_year] where timestamp between '[past_2160_minutes]' and
'[now]' group by FLOOR(UNIX_TIMESTAMP(Timestamp)/43200),host);
+#
+replace into [hadoop_rpc_month] (select timestamp,[group_avg(hadoop_rpc)] from
[hadoop_rpc_week] where timestamp between '[past_15_minutes]' and '[now]' group
by FLOOR(UNIX_TIMESTAMP(Timestamp)/300),host);
+replace into [hadoop_rpc_quarter] (select timestamp,[group_avg(hadoop_rpc)]
from [hadoop_rpc_month] where timestamp between '[past_90_minutes]' and '[now]'
group by FLOOR(UNIX_TIMESTAMP(Timestamp)/1800),host);
+replace into [hadoop_rpc_year] (select timestamp,[group_avg(hadoop_rpc)] from
[hadoop_rpc_quarter] where timestamp between '[past_540_minutes]' and '[now]'
group by FLOOR(UNIX_TIMESTAMP(Timestamp)/10800),host);
+replace into [hadoop_rpc_decade] (select timestamp,[group_avg(hadoop_rpc)]
from [hadoop_rpc_year] where timestamp between '[past_2160_minutes]' and
'[now]' group by FLOOR(UNIX_TIMESTAMP(Timestamp)/43200),host);
+#
+replace into [cluster_hadoop_rpc_month] (select
timestamp,[avg(cluster_hadoop_rpc)] from [cluster_hadoop_rpc_week] where
timestamp between '[past_15_minutes]' and '[now]' group by
FLOOR(UNIX_TIMESTAMP(Timestamp)/300));
+replace into [cluster_hadoop_rpc_quarter] (select
timestamp,[avg(cluster_hadoop_rpc)] from [cluster_hadoop_rpc_month] where
timestamp between '[past_90_minutes]' and '[now]' group by
FLOOR(UNIX_TIMESTAMP(Timestamp)/1800));
+replace into [cluster_hadoop_rpc_year] (select
timestamp,[avg(cluster_hadoop_rpc)] from [cluster_hadoop_rpc_quarter] where
timestamp between '[past_540_minutes]' and '[now]' group by
FLOOR(UNIX_TIMESTAMP(Timestamp)/10800));
+replace into [cluster_hadoop_rpc_decade] (select
timestamp,[avg(cluster_hadoop_rpc)] from [cluster_hadoop_rpc_year] where
timestamp between '[past_2160_minutes]' and '[now]' group by
FLOOR(UNIX_TIMESTAMP(Timestamp)/43200));
+#
+replace into [hadoop_mapred_month] (select
timestamp,[group_avg(hadoop_mapred)] from [hadoop_mapred_week] where timestamp
between '[past_15_minutes]' and '[now]' group by
FLOOR(UNIX_TIMESTAMP(Timestamp)/300),host);
+replace into [hadoop_mapred_quarter] (select
timestamp,[group_avg(hadoop_mapred)] from [hadoop_mapred_month] where timestamp
between '[past_90_minutes]' and '[now]' group by
FLOOR(UNIX_TIMESTAMP(Timestamp)/1800),host);
+replace into [hadoop_mapred_year] (select timestamp,[group_avg(hadoop_mapred)]
from [hadoop_mapred_quarter] where timestamp between '[past_540_minutes]' and
'[now]' group by FLOOR(UNIX_TIMESTAMP(Timestamp)/10800),host);
+replace into [hadoop_mapred_decade] (select
timestamp,[group_avg(hadoop_mapred)] from [hadoop_mapred_year] where timestamp
between '[past_2160_minutes]' and '[now]' group by
FLOOR(UNIX_TIMESTAMP(Timestamp)/43200),host);
+#
+replace into [hadoop_jvm_month] (select timestamp,[group_avg(hadoop_jvm)] from
[hadoop_jvm_week] where timestamp between '[past_15_minutes]' and '[now]' group
by FLOOR(UNIX_TIMESTAMP(Timestamp)/300),host,process_name);
+replace into [hadoop_jvm_quarter] (select timestamp,[group_avg(hadoop_jvm)]
from [hadoop_jvm_month] where timestamp between '[past_90_minutes]' and '[now]'
group by FLOOR(UNIX_TIMESTAMP(Timestamp)/1800),host,process_name);
+replace into [hadoop_jvm_year] (select timestamp,[group_avg(hadoop_jvm)] from
[hadoop_jvm_quarter] where timestamp between '[past_540_minutes]' and '[now]'
group by FLOOR(UNIX_TIMESTAMP(Timestamp)/10800),host,process_name);
+replace into [hadoop_jvm_decade] (select timestamp,[group_avg(hadoop_jvm)]
from [hadoop_jvm_year] where timestamp between '[past_2160_minutes]' and
'[now]' group by FLOOR(UNIX_TIMESTAMP(Timestamp)/43200),host,process_name);
+#
+replace into [dfs_throughput_month] (select timestamp,[avg(dfs_throughput)]
from [dfs_throughput_week] where timestamp between '[past_15_minutes]' and
'[now]' group by FLOOR(UNIX_TIMESTAMP(Timestamp)/300));
+replace into [dfs_throughput_quarter] (select timestamp,[avg(dfs_throughput)]
from [dfs_throughput_month] where timestamp between '[past_90_minutes]' and
'[now]' group by FLOOR(UNIX_TIMESTAMP(Timestamp)/1800));
+replace into [dfs_throughput_year] (select timestamp,[avg(dfs_throughput)]
from [dfs_throughput_quarter] where timestamp between '[past_540_minutes]' and
'[now]' group by FLOOR(UNIX_TIMESTAMP(Timestamp)/10800));
+replace into [dfs_throughput_decade] (select timestamp,[avg(dfs_throughput)]
from [dfs_throughput_year] where timestamp between '[past_2160_minutes]' and
'[now]' group by FLOOR(UNIX_TIMESTAMP(Timestamp)/43200));
+#
+replace into [node_activity_month] (select timestamp,[avg(node_activity)] from
[node_activity_week] where timestamp between '[past_15_minutes]' and '[now]'
group by FLOOR(UNIX_TIMESTAMP(Timestamp)/300));
+replace into [node_activity_quarter] (select timestamp,[avg(node_activity)]
from [node_activity_month] where timestamp between '[past_90_minutes]' and
'[now]' group by FLOOR(UNIX_TIMESTAMP(Timestamp)/1800));
+replace into [node_activity_year] (select timestamp,[avg(node_activity)] from
[node_activity_quarter] where timestamp between '[past_540_minutes]' and
'[now]' group by FLOOR(UNIX_TIMESTAMP(Timestamp)/10800));
+replace into [node_activity_decade] (select timestamp,[avg(node_activity)]
from [node_activity_year] where timestamp between '[past_2160_minutes]' and
'[now]' group by FLOOR(UNIX_TIMESTAMP(Timestamp)/43200));
+#
+replace into [dfs_fsnamesystem_month] (select
timestamp,[group_avg(dfs_fsnamesystem)] from [dfs_fsnamesystem_week] where
timestamp between '[past_15_minutes]' and '[now]' group by
FLOOR(UNIX_TIMESTAMP(Timestamp)/300),host);
+replace into [dfs_fsnamesystem_quarter] (select
timestamp,[group_avg(dfs_fsnamesystem)] from [dfs_fsnamesystem_month] where
timestamp between '[past_90_minutes]' and '[now]' group by
FLOOR(UNIX_TIMESTAMP(Timestamp)/1800),host);
+replace into [dfs_fsnamesystem_year] (select
timestamp,[group_avg(dfs_fsnamesystem)] from [dfs_fsnamesystem_quarter] where
timestamp between '[past_540_minutes]' and '[now]' group by
FLOOR(UNIX_TIMESTAMP(Timestamp)/10800),host);
+replace into [dfs_fsnamesystem_decade] (select
timestamp,[group_avg(dfs_fsnamesystem)] from [dfs_fsnamesystem_year] where
timestamp between '[past_2160_minutes]' and '[now]' group by
FLOOR(UNIX_TIMESTAMP(Timestamp)/43200),host);
+#
+replace into [disk_month] (select timestamp,[group_avg(disk)] from [disk_week]
where timestamp between '[past_15_minutes]' and '[now]' group by
FLOOR(UNIX_TIMESTAMP(Timestamp)/300),host,mount);
+replace into [disk_quarter] (select timestamp,[group_avg(disk)] from
[disk_month] where timestamp between '[past_90_minutes]' and '[now]' group by
FLOOR(UNIX_TIMESTAMP(Timestamp)/1800),host,mount);
+replace into [disk_year] (select timestamp,[group_avg(disk)] from
[disk_quarter] where timestamp between '[past_540_minutes]' and '[now]' group
by FLOOR(UNIX_TIMESTAMP(Timestamp)/10800),host,mount);
+replace into [disk_decade] (select timestamp,[group_avg(disk)] from
[disk_year] where timestamp between '[past_2160_minutes]' and '[now]' group by
FLOOR(UNIX_TIMESTAMP(Timestamp)/43200),host,mount);
+#
+replace into [cluster_disk_month] (select timestamp,[group_avg(cluster_disk)]
from [cluster_disk_week] where timestamp between '[past_15_minutes]' and
'[now]' group by FLOOR(UNIX_TIMESTAMP(Timestamp)/300),mount);
+replace into [cluster_disk_quarter] (select
timestamp,[group_avg(cluster_disk)] from [cluster_disk_month] where timestamp
between '[past_90_minutes]' and '[now]' group by
FLOOR(UNIX_TIMESTAMP(Timestamp)/1800),mount);
+replace into [cluster_disk_year] (select timestamp,[group_avg(cluster_disk)]
from [cluster_disk_quarter] where timestamp between '[past_540_minutes]' and
'[now]' group by FLOOR(UNIX_TIMESTAMP(Timestamp)/10800),mount);
+replace into [cluster_disk_decade] (select timestamp,[group_avg(cluster_disk)]
from [cluster_disk_year] where timestamp between '[past_2160_minutes]' and
'[now]' group by FLOOR(UNIX_TIMESTAMP(Timestamp)/43200),mount);
+#
+replace into [cluster_system_metrics_month] (select
timestamp,[avg(cluster_system_metrics)] from [cluster_system_metrics_week]
where timestamp between '[past_15_minutes]' and '[now]' group by
FLOOR(UNIX_TIMESTAMP(Timestamp)/300));
+replace into [cluster_system_metrics_quarter] (select
timestamp,[avg(cluster_system_metrics)] from [cluster_system_metrics_month]
where timestamp between '[past_90_minutes]' and '[now]' group by
FLOOR(UNIX_TIMESTAMP(Timestamp)/1800));
+replace into [cluster_system_metrics_year] (select
timestamp,[avg(cluster_system_metrics)] from [cluster_system_metrics_quarter]
where timestamp between '[past_540_minutes]' and '[now]' group by
FLOOR(UNIX_TIMESTAMP(Timestamp)/10800));
+replace into [cluster_system_metrics_decade] (select
timestamp,[avg(cluster_system_metrics)] from [cluster_system_metrics_year]
where timestamp between '[past_2160_minutes]' and '[now]' group by
FLOOR(UNIX_TIMESTAMP(Timestamp)/43200));
+#
+replace into [hod_job_digest_month] (select
timestamp,[group_avg(hod_job_digest)] from [hod_job_digest_week] where
timestamp between '[past_15_minutes]' and '[now]' group by
FLOOR(UNIX_TIMESTAMP(Timestamp)/300),HodID);
+replace into [hod_job_digest_quarter] (select
timestamp,[group_avg(hod_job_digest)] from [hod_job_digest_month] where
timestamp between '[past_90_minutes]' and '[now]' group by
FLOOR(UNIX_TIMESTAMP(Timestamp)/1800),HodID);
+replace into [hod_job_digest_year] (select
timestamp,[group_avg(hod_job_digest)] from [hod_job_digest_quarter] where
timestamp between '[past_540_minutes]' and '[now]' group by
FLOOR(UNIX_TIMESTAMP(Timestamp)/10800),HodID);
+replace into [hod_job_digest_decade] (select
timestamp,[group_avg(hod_job_digest)] from [hod_job_digest_year] where
timestamp between '[past_2160_minutes]' and '[now]' group by
FLOOR(UNIX_TIMESTAMP(Timestamp)/43200),HodID);
+#
+replace into [user_util_month] (select timestamp,[group_avg(user_util)] from
[user_util_week] where timestamp between '[past_15_minutes]' and '[now]' group
by FLOOR(UNIX_TIMESTAMP(Timestamp)/300),user);
+replace into [user_util_quarter] (select timestamp,[group_avg(user_util)] from
[user_util_month] where timestamp between '[past_90_minutes]' and '[now]' group
by FLOOR(UNIX_TIMESTAMP(Timestamp)/1800),user);
+replace into [user_util_year] (select timestamp,[group_avg(user_util)] from
[user_util_quarter] where timestamp between '[past_540_minutes]' and '[now]'
group by FLOOR(UNIX_TIMESTAMP(Timestamp)/10800),user);
+replace into [user_util_decade] (select timestamp,[group_avg(user_util)] from
[user_util_year] where timestamp between '[past_2160_minutes]' and '[now]'
group by FLOOR(UNIX_TIMESTAMP(Timestamp)/43200),user);
Modified: hadoop/core/trunk/src/contrib/chukwa/conf/database_create_tables
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/conf/database_create_tables?rev=728206&r1=728205&r2=728206&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/conf/database_create_tables (original)
+++ hadoop/core/trunk/src/contrib/chukwa/conf/database_create_tables Fri Dec 19
17:11:42 2008
@@ -160,7 +160,7 @@
sdc_busy_pcnt float,
sdd_busy_pcnt float,
swap_used_pcnt float,
- primary key(host, timestamp),
+ primary key(timestamp),
index (timestamp)
);
@@ -571,7 +571,7 @@
sdd_busy_pcnt float,
swap_used_pcnt float,
primary key(HodId, timestamp),
- index(timeStamp)
+ index(timestamp)
);
create table if not exists user_util_template (
@@ -589,7 +589,9 @@
network_used_pcnt float,
memory_unused double,
memory_used double,
- memory_used_pcnt float
+ memory_used_pcnt float,
+ primary key(user, timestamp),
+ index(timestamp)
);
create table if not exists QueueInfo(
@@ -598,5 +600,5 @@
Queue VARCHAR(20),
NumOfMachine smallint unsigned,
status varchar(1),
- index(TimeStamp)
+ index(Timestamp)
);
Modified:
hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/database/Aggregator.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/database/Aggregator.java?rev=728206&r1=728205&r2=728206&view=diff
==============================================================================
---
hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/database/Aggregator.java
(original)
+++
hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/database/Aggregator.java
Fri Dec 19 17:11:42 2008
@@ -25,30 +25,37 @@
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
import java.util.Calendar;
import java.util.HashMap;
import java.util.Iterator;
-
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.sql.DatabaseMetaData;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.chukwa.inputtools.mdl.DataConfig;
import org.apache.hadoop.chukwa.util.DatabaseWriter;
+import org.apache.hadoop.chukwa.util.ExceptionUtil;
import org.apache.hadoop.chukwa.util.PidFile;
public class Aggregator {
private static DatabaseConfig dbc = null;
- private static Log log = LogFactory.getLog(Consolidator.class);
+ private static Log log = LogFactory.getLog(Aggregator.class);
+ private String table = null;
+ private String jdbc = null;
+ private int[] intervals;
private long current = 0;
- private static PidFile loader=null;
-
- public Aggregator() {
-
+ private static DatabaseWriter db = null;
+ public Aggregator() {
dbc = new DatabaseConfig();
Calendar now = Calendar.getInstance();
current = now.getTimeInMillis();
}
- public HashMap<String,String> findMacros(String query) {
+ public HashMap<String,String> findMacros(String query) throws
SQLException {
boolean add=false;
HashMap<String,String> macroList = new HashMap<String,String>();
String macro="";
@@ -71,54 +78,131 @@
return macroList;
}
- public String computeMacro(String macro) {
- if(macro.indexOf("avg(")==0) {
+ public String computeMacro(String macro) throws SQLException {
+ Pattern p = Pattern.compile("past_(.*)_minutes");
+ Matcher matcher = p.matcher(macro);
+ if(macro.indexOf("avg(")==0 || macro.indexOf("group_avg(")==0) {
String meta="";
- String[] table =
dbc.findTableName(macro.substring(4,macro.indexOf(")")), current, current);
+ String[] table =
dbc.findTableName(macro.substring(macro.indexOf("(")+1,macro.indexOf(")")),
current, current);
try {
String cluster = System.getProperty("CLUSTER");
if(cluster==null) {
cluster="unknown";
}
- DatabaseWriter db = new DatabaseWriter(cluster);
-
- String query = "select * from "+table[0]+" order by
timestamp desc limit 1";
- log.debug("Query: "+query);
- ResultSet rs = db.query(query);
- if(rs==null) {
- throw new SQLException("Table is undefined.");
+ DatabaseMetaData dbMetaData = db.getConnection().getMetaData();
+ ResultSet rs = dbMetaData.getColumns ( null,null,table[0],
null);
+ boolean first=true;
+ while(rs.next()) {
+ if(!first) {
+ meta = meta+",";
+ }
+ String name = rs.getString(4);
+ int type = rs.getInt(5);
+ if(type==java.sql.Types.VARCHAR) {
+ if(macro.indexOf("group_avg(")<0) {
+ meta=meta+"count("+name+") as "+name;
+ } else {
+ meta=meta+name;
+ }
+ first=false;
+ } else if(type==java.sql.Types.DOUBLE ||
+ type==java.sql.Types.FLOAT ||
+ type==java.sql.Types.INTEGER) {
+ meta=meta+"avg("+name+")";
+ first=false;
+ } else if(type==java.sql.Types.TIMESTAMP) {
+ // Skip the column
+ } else {
+ meta=meta+"AVG("+name+")";
+ first=false;
+ }
}
- ResultSetMetaData rmeta = rs.getMetaData();
- if(rs.next()) {
- boolean first=true;
- for(int i=1;i<=rmeta.getColumnCount();i++) {
- if(!first) {
- meta=meta+",";
- }
-
if(rmeta.getColumnType(i)==java.sql.Types.VARCHAR) {
-
meta=meta+"count("+rmeta.getColumnName(i)+") as "+rmeta.getColumnName(i);
- first=false;
- } else
if(rmeta.getColumnType(i)==java.sql.Types.DOUBLE ||
-
rmeta.getColumnType(i)==java.sql.Types.INTEGER ||
-
rmeta.getColumnType(i)==java.sql.Types.FLOAT) {
-
meta=meta+"avg("+rmeta.getColumnName(i)+")";
- first=false;
- } else
if(rmeta.getColumnType(i)==java.sql.Types.TIMESTAMP) {
- // Skip the column
- } else {
-
meta=meta+"avg("+rmeta.getColumnName(i)+")";
- first=false;
- }
- }
+ if(first) {
+ throw new SQLException("Table is undefined.");
}
} catch(SQLException ex) {
- log.error(ex);
+ throw new SQLException("Table does not exist:"+
table[0]);
}
return meta;
} else if(macro.indexOf("now")==0) {
+ SimpleDateFormat sdf = new SimpleDateFormat();
return DatabaseWriter.formatTimeStamp(current);
+ } else if(matcher.find()) {
+ int period = Integer.parseInt(matcher.group(1));
+ long timestamp = current - (current %
(period*60*1000L)) - (period*60*1000L);
+ return DatabaseWriter.formatTimeStamp(timestamp);
} else if(macro.indexOf("past_hour")==0) {
return
DatabaseWriter.formatTimeStamp(current-3600*1000L);
+ } else if(macro.endsWith("_week")) {
+ long partition = current / DatabaseConfig.WEEK;
+ if(partition<=0) {
+ partition=1;
+ }
+ String[] buffers = macro.split("_");
+ StringBuffer tableName = new StringBuffer();
+ for(int i=0;i<buffers.length-1;i++) {
+ tableName.append(buffers[i]);
+ tableName.append("_");
+ }
+ tableName.append(partition);
+ tableName.append("_week");
+ return tableName.toString();
+ } else if(macro.endsWith("_month")) {
+ long partition = current / DatabaseConfig.MONTH;
+ if(partition<=0) {
+ partition=1;
+ }
+ String[] buffers = macro.split("_");
+ StringBuffer tableName = new StringBuffer();
+ for(int i=0;i<buffers.length-1;i++) {
+ tableName.append(buffers[i]);
+ tableName.append("_");
+ }
+ tableName.append(partition);
+ tableName.append("_month");
+ return tableName.toString();
+ } else if(macro.endsWith("_quarter")) {
+ long partition = current / DatabaseConfig.QUARTER;
+ if(partition<=0) {
+ partition=1;
+ }
+ String[] buffers = macro.split("_");
+ StringBuffer tableName = new StringBuffer();
+ for(int i=0;i<buffers.length-1;i++) {
+ tableName.append(buffers[i]);
+ tableName.append("_");
+ }
+ tableName.append(partition);
+ tableName.append("_quarter");
+ return tableName.toString();
+ } else if(macro.endsWith("_year")) {
+ long partition = current / DatabaseConfig.YEAR;
+ if(partition<=0) {
+ partition=1;
+ }
+ String[] buffers = macro.split("_");
+ StringBuffer tableName = new StringBuffer();
+ for(int i=0;i<buffers.length-1;i++) {
+ tableName.append(buffers[i]);
+ tableName.append("_");
+ }
+ tableName.append(partition);
+ tableName.append("_year");
+ return tableName.toString();
+ } else if(macro.endsWith("_decade")) {
+ long partition = current / DatabaseConfig.DECADE;
+ if(partition<=0) {
+ partition=1;
+ }
+ String[] buffers = macro.split("_");
+ StringBuffer tableName = new StringBuffer();
+ for(int i=0;i<buffers.length-1;i++) {
+ tableName.append(buffers[i]);
+ tableName.append("_");
+ }
+ tableName.append(partition);
+ tableName.append("_decade");
+ return tableName.toString();
}
String[] tableList = dbc.findTableName(macro,current,current);
return tableList[0];
@@ -143,62 +227,51 @@
return contents.toString();
}
- public void process(String table, String query) {
+ public void process(String query) {
ResultSet rs = null;
+ String[] columns;
+ int[] columnsType;
+ String groupBy = "";
long start = current;
long end = current;
- String cluster = System.getProperty("CLUSTER");
- if(cluster==null) {
- cluster="unknown";
+
+ try {
+ HashMap<String, String> macroList = findMacros(query);
+ Iterator<String> macroKeys = macroList.keySet().iterator();
+ while(macroKeys.hasNext()) {
+ String mkey = macroKeys.next();
+ log.debug("replacing:"+mkey+" with "+macroList.get(mkey));
+ query = query.replace("["+mkey+"]", macroList.get(mkey));
+ }
+ db.execute(query);
+ } catch(SQLException e) {
+ log.error(query);
+ log.error(e.getMessage());
}
- DatabaseWriter db = new DatabaseWriter(cluster);
- // Find the last aggregated value from table
- String[] tmpList =
dbc.findTableName(table,start,end);
- String timeTest = "select timestamp from
"+tmpList[0]+" order by timestamp desc limit 1";
- try {
- rs = db.query(timeTest);
- while(rs.next()) {
- start=rs.getTimestamp(1).getTime();
- end=start;
- }
- } catch (SQLException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
- // Transform table names
- HashMap<String, String> macroList = findMacros(query);
- Iterator<String> macroKeys = macroList.keySet().iterator();
- while(macroKeys.hasNext()) {
- String mkey = macroKeys.next();
- log.debug("replacing:"+mkey+" with
"+macroList.get(mkey));
- query = query.replace("["+mkey+"]",
macroList.get(mkey));
- }
- log.info(query);
- db.execute(query);
- db.close();
}
public static void main(String[] args) {
- loader=new PidFile(System.getProperty("CLUSTER")+"Aggregator");
- dbc = new DatabaseConfig();
+ log.info("Aggregator started.");
+ dbc = new DatabaseConfig();
+ String cluster = System.getProperty("CLUSTER");
+ if(cluster==null) {
+ cluster="unknown";
+ }
+ db = new DatabaseWriter(cluster);
String queries = Aggregator.getContents(new
File(System.getenv("CHUKWA_CONF_DIR")+File.separator+"aggregator.sql"));
String[] query = queries.split("\n");
for(int i=0;i<query.length;i++) {
- int startOffset = query[i].indexOf("[")+1;
- int endOffset = query[i].indexOf("]");
if(query[i].equals("")) {
- } else if(startOffset==-1 || endOffset==-1) {
- log.error("Unable to extract table name from
query:"+query[i]);
} else if(query[i].indexOf("#")==0) {
log.debug("skipping: "+query[i]);
} else {
- String table = query[i].substring(startOffset,
endOffset);
Aggregator dba = new Aggregator();
- dba.process(table, query[i]);
+ dba.process(query[i]);
}
}
- loader.clean();
+ db.close();
+ log.info("Aggregator finished.");
}
}
Modified:
hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/util/DatabaseWriter.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/util/DatabaseWriter.java?rev=728206&r1=728205&r2=728206&view=diff
==============================================================================
---
hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/util/DatabaseWriter.java
(original)
+++
hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/util/DatabaseWriter.java
Fri Dec 19 17:11:42 2008
@@ -27,6 +27,7 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.chukwa.inputtools.mdl.DataConfig;
public class DatabaseWriter {
private static Log log = LogFactory.getLog(DatabaseWriter.class);
@@ -35,26 +36,25 @@
private ResultSet rs = null;
public DatabaseWriter(String host, String user, String password) {
- String jdbc_url = System.getenv("JDBC_URL_PREFIX")+host+"/";
-
- if(user!=null) {
+ DataConfig mdlConfig = new DataConfig();
+ String jdbc_url = "jdbc:mysql://"+host+"/";
+ if(user!=null) {
jdbc_url = jdbc_url + "?user=" + user;
if(password!=null) {
jdbc_url = jdbc_url + "&password=" + password;
}
- }
+ }
try {
// The newInstance() call is a work around for some
// broken Java implementations
- String jdbcDriver = System.getenv("JDBC_DRIVER");
- Class.forName(jdbcDriver).newInstance();
+ Class.forName("com.mysql.jdbc.Driver").newInstance();
} catch (Exception ex) {
// handle the error
log.error(ex,ex);
}
try {
conn = DriverManager.getConnection(jdbc_url);
- log.info("Initialized JDBC URL: "+jdbc_url);
+ log.debug("Initialized JDBC URL: "+jdbc_url);
} catch (SQLException ex) {
log.error(ex,ex);
}
@@ -66,20 +66,43 @@
try {
// The newInstance() call is a work around for some
// broken Java implementations
- String jdbcDriver = System.getenv("JDBC_DRIVER");
- Class.forName(jdbcDriver).newInstance();
+ Class.forName("com.mysql.jdbc.Driver").newInstance();
} catch (Exception ex) {
// handle the error
log.error(ex,ex);
}
try {
conn = DriverManager.getConnection(jdbc_url);
- log.info("Initialized JDBC URL: "+jdbc_url);
+ log.debug("Initialized JDBC URL: "+jdbc_url);
} catch (SQLException ex) {
log.error(ex,ex);
}
}
+ public DatabaseWriter() {
+ DataConfig mdlConfig = new DataConfig();
+ String jdbc_url =
"jdbc:mysql://"+mdlConfig.get("jdbc.host")+"/"+mdlConfig.get("jdbc.db");
+ if(mdlConfig.get("jdbc.user")!=null) {
+ jdbc_url = jdbc_url + "?user=" + mdlConfig.get("jdbc.user");
+ if(mdlConfig.get("jdbc.password")!=null) {
+ jdbc_url = jdbc_url + "&password=" +
mdlConfig.get("jdbc.password");
+ }
+ }
+ try {
+ // The newInstance() call is a work around for some
+ // broken Java implementations
+ Class.forName("com.mysql.jdbc.Driver").newInstance();
+ } catch (Exception ex) {
+ // handle the error
+ log.error(ex,ex);
+ }
+ try {
+ conn = DriverManager.getConnection(jdbc_url);
+ log.debug("Initialized JDBC URL: "+jdbc_url);
+ } catch (SQLException ex) {
+ log.error(ex,ex);
+ }
+ }
public void execute(String query) {
try {
stmt = conn.createStatement();
@@ -102,6 +125,9 @@
}
}
}
+ public Connection getConnection() {
+ return conn;
+ }
public ResultSet query(String query) throws SQLException {
try {
stmt = conn.createStatement();