Author: eyang
Date: Fri Feb 27 18:23:01 2009
New Revision: 748628

URL: http://svn.apache.org/viewvc?rev=748628&view=rev
Log:
HADOOP-5029.  Added mdl script to manually load chukwa sequence file to 
database.

Added:
    hadoop/core/trunk/src/contrib/chukwa/bin/mdl.sh
Modified:
    
hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/database/MetricDataLoader.java

Added: hadoop/core/trunk/src/contrib/chukwa/bin/mdl.sh
URL: 
http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/bin/mdl.sh?rev=748628&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/bin/mdl.sh (added)
+++ hadoop/core/trunk/src/contrib/chukwa/bin/mdl.sh Fri Feb 27 18:23:01 2009
@@ -0,0 +1,28 @@
+#!/bin/bash
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+bin=`dirname "$0"`
+bin=`cd "$bin"; pwd`
+. "$bin"/chukwa-config.sh
+
+if [ $# -lt 2 ]; then
+    echo "Usage: mdl.sh <cluster name> <chukwa sequence file>"
+    echo ""
+    exit 1
+fi
+
+${JAVA_HOME}/bin/java -Xms2048M -Xmx3096M 
-DDATACONFIG=${CHUKWA_CONF_DIR}/mdl.xml 
-Djava.library.path=${JAVA_LIBRARY_PATH} -DCHUKWA_HOME=${CHUKWA_HOME} 
-DCHUKWA_CONF_DIR=${CHUKWA_CONF_DIR} -DCHUKWA_LOG_DIR=${CHUKWA_LOG_DIR} 
-DAPP=MDL -Dlog4j.configuration=chukwa-log4j.properties -classpath 
${CLASSPATH}:/homes/eyang/chukwa-core-0.1.1.jar:${HADOOP_JAR}:${COMMON}:${tools}:${CHUKWA_HOME}/conf
 org.apache.hadoop.chukwa.extraction.database.MetricDataLoader $@
+

Modified: 
hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/database/MetricDataLoader.java
URL: 
http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/database/MetricDataLoader.java?rev=748628&r1=748627&r2=748628&view=diff
==============================================================================
--- 
hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/database/MetricDataLoader.java
 (original)
+++ 
hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/database/MetricDataLoader.java
 Fri Feb 27 18:23:01 2009
@@ -57,6 +57,7 @@
      private static HashMap<String, String> dbTables = null;
      private HashMap<String, HashMap<String,Integer>> dbSchema = null;
      private static String newSpace="-";
+     private static boolean batchMode = true;
 
      /** Creates a new instance of DBWriter */
     public MetricDataLoader() {        
@@ -146,31 +147,30 @@
     
     public void process(Path source)  throws IOException, URISyntaxException, 
SQLException {
 
-               System.out.println("Input file:" + source.getName());
+        System.out.println("Input file:" + source.getName());
 
-               ChukwaConfiguration conf = new ChukwaConfiguration();
-               String fsName = conf.get("writer.hdfs.filesystem");
-               FileSystem fs = FileSystem.get(new URI(fsName), conf);
+        ChukwaConfiguration conf = new ChukwaConfiguration();
+        String fsName = conf.get("writer.hdfs.filesystem");
+        FileSystem fs = FileSystem.get(new URI(fsName), conf);
 
-               SequenceFile.Reader r = 
+        SequenceFile.Reader r = 
                        new SequenceFile.Reader(fs,source, conf);
 
         stmt = conn.createStatement(); 
         conn.setAutoCommit(false);
         
-               ChukwaRecordKey key = new ChukwaRecordKey();
-               ChukwaRecord record = new ChukwaRecord();
-               try
-               {
-                       while (r.next(key, record))
-                       {
-                               boolean isSuccessful=true;
-                String sqlTime = 
DatabaseWriter.formatTimeStamp(record.getTime());
-                               log.debug("Timestamp: " + record.getTime());
-                               log.debug("DataType: " + key.getReduceType());
-                               log.debug("StreamName: " + source.getName());
+        ChukwaRecordKey key = new ChukwaRecordKey();
+        ChukwaRecord record = new ChukwaRecord();
+        try {
+            int batch=0;
+            while (r.next(key, record)) {
+                    boolean isSuccessful=true;
+                    String sqlTime = 
DatabaseWriter.formatTimeStamp(record.getTime());
+                    log.debug("Timestamp: " + record.getTime());
+                    log.debug("DataType: " + key.getReduceType());
+                    log.debug("StreamName: " + source.getName());
                
-                               String[] fields = record.getFields();
+                    String[] fields = record.getFields();
                    String table = null;
                    String[] priKeys = null;
                    HashMap<String, HashMap<String, String>> hashReport = new 
HashMap<String ,HashMap<String, String>>();
@@ -232,6 +232,7 @@
                    }
                    Iterator<String> i = hashReport.keySet().iterator();
                    while(i.hasNext()) {
+                       long currentTimeMillis = System.currentTimeMillis();
                        Object iteratorNode = i.next();
                        HashMap<String, String> recordSet = 
hashReport.get(iteratorNode);
                        Iterator<String> fi = recordSet.keySet().iterator();
@@ -298,19 +299,28 @@
                                  " ON DUPLICATE KEY UPDATE " + sqlValues + ";";
                        }
                        log.debug(sql);
-                       stmt.addBatch(sql);
+                       if(batchMode) {
+                               stmt.addBatch(sql);
+                               batch++;
+                       } else {
+                               stmt.execute(sql);
+                       }
                        String logMsg = (isSuccessful ? "Saved" : "Error 
occurred in saving");
-                       long currentTimeMillis = System.currentTimeMillis();
-                       long latencyMillis = currentTimeMillis - 
record.getTime();
+                       long latencyMillis = System.currentTimeMillis() - 
currentTimeMillis;
                        int latencySeconds = ((int)(latencyMillis + 500)) / 
1000;
+                               if(batchMode && batch>20000) {
+                                   int[] updateCounts = stmt.executeBatch();
+                                   batch=0;
+                               }
                        log.debug(logMsg + " (" + recordType + "," + 
RecordUtil.getClusterName(record) +
                               "," + record.getTime() +
                               ") " + latencySeconds + " sec");                
                    }
 
                        }
-                       @SuppressWarnings("unused")
-                       int[] updateCounts = stmt.executeBatch();
+                       if(batchMode) {
+                           int[] updateCounts = stmt.executeBatch();
+                       }
                } catch (SQLException ex) {
                        // handle any errors
                        log.error(ex, ex);
@@ -341,8 +351,8 @@
     
        public static void main(String[] args) {
                try {
-                       MetricDataLoader mdl = new MetricDataLoader();
-                       mdl.process(new Path(args[0]));
+                       MetricDataLoader mdl = new MetricDataLoader(args[0]);
+                       mdl.process(new Path(args[1]));
                } catch(Exception e) {
                        e.printStackTrace();
                }


Reply via email to