As I said before, the JDBC interface for Hadoop solves a specific problem, whereas HBase and HDFS is really the answer to the kind of problem your hinting at.
Fredrik On Feb 6, 2009, at 4:06 PM, Stefan Podkowinski wrote:
On Fri, Feb 6, 2009 at 2:40 PM, Fredrik Hedberg <[email protected]> wrote:Well, that obviously depend on the RDBMS' implementation. And although the case is not as bad as you describe (otherwise you better ask your RDBMS vendor for your money back), your point is valid. But then again, a RDBMS isnot designed for that kind of work.Right. Clash of design paradigms. Hey MySQL, I want my money back!! Oh, wait..Another scenario I just recognized: what about current/"realtime" data? E.g. 'select * from logs where date = today()'. Working with 'offset' may turn out to return different results after the table has been updated and tasks are still pending. Pretty ugly to trace down this condition, after you found out that sometimes your results are just not right..What do you mean by "creating splits/map tasks on the fly dynamically"?Fredrik On Feb 5, 2009, at 4:49 PM, Stefan Podkowinski wrote:As far as i understand the main problem is that you need to create splits from streaming data with an unknown number of records andoffsets. Its just the same problem as with externally compressed data (.gz). You need to go through the complete stream (or do a table scan)to create logical splits. Afterwards each map task needs to seek tothe appropriate offset on a new stream over again. Very expansive. As with compressed files, no wonder only one map task is started for each.gz file and will consume the complete file. IMHO the DBInputFormat should follow this behavior and just create 1 split whatsoever.Maybe a future version of hadoop will allow to create splits/map taskson the fly dynamically? Stefan On Thu, Feb 5, 2009 at 3:28 PM, Fredrik Hedberg <[email protected]> wrote:Indeed sir.The implementation was designed like you describe for two reasons. Firstandforemost to make is as simple as possible for the user to use a JDBC database as input and output for Hadoop. Secondly because of the specificrequirements the MapReduce framework brings to the table (split distribution, split reproducibility etc).This design will, as you note, never handle the same amount of data asHBase(or HDFS), and was never intended to. That being said, there are a coupleofways that the current design could be augmented to perform better (and,asin its current form, tweaked, depending on you data and computationalrequirements). Shard awareness is one way, which would let eachdatabase/tasktracker-node execute mappers on data where each split is asingle database server for example.If you have any ideas on how the current design can be improved, pleasedo share. Fredrik On Feb 5, 2009, at 11:37 AM, Stefan Podkowinski wrote:The 0.19 DBInputFormat class implementation is IMHO only suitable forvery simple queries working on only few datasets. Thats due to the fact that it tries to create splits from the query by1) getting a count of all rows using the specified count query (hugeperformance impact on large tables)2) creating splits by issuing an individual query for each split witha "limit" and "offset" parameter appended to the input sql query Effectively your input query "select * from orders" would become "select * from orders limit <splitlength> offset <splitstart>" andexecuted until count has been reached. I guess this is not working sqlsyntax for oracle. Stefan 2009/2/4 Amandeep Khurana <[email protected]>:Adding a semicolon gives me the error "ORA-00911: Invalid character"Amandeep Amandeep Khurana Computer Science Graduate Student University of California, Santa CruzOn Wed, Feb 4, 2009 at 6:46 AM, Rasit OZDAS <[email protected]>wrote:Amandeep, "SQL command not properly ended" I get this error whenever I forget the semicolon at the end. I know, it doesn't make sense, but I recommend giving it a try Rasit 2009/2/4 Amandeep Khurana <[email protected]>:The same query is working if I write a simple JDBC client and querythedatabase. So, I'm probably doing something wrong in the connectionsettings.But the error looks to be on the query side more than the connectionside.Amandeep Amandeep Khurana Computer Science Graduate Student University of California, Santa CruzOn Tue, Feb 3, 2009 at 7:25 PM, Amandeep Khurana <[email protected] >wrote:Thanks Kevin I couldnt get it work. Here's the error I get: bin/hadoop jar ~/dbload.jar LoadTable109/02/03 19:21:17 INFO jvm.JvmMetrics: Initializing JVM Metrics withprocessName=JobTracker, sessionId=09/02/03 19:21:20 INFO mapred.JobClient: Running job: job_local_000109/02/03 19:21:21 INFO mapred.JobClient: map 0% reduce 0% 09/02/03 19:21:22 INFO mapred.MapTask: numReduceTasks: 0 09/02/03 19:21:24 WARN mapred.LocalJobRunner: job_local_0001 java.io.IOException: ORA-00933: SQL command not properly ended atorg .apache .hadoop .mapred .lib.db.DBInputFormat.getRecordReader(DBInputFormat.java:289)at org.apache.hadoop.mapred.MapTask.run(MapTask.java:321) atorg.apache.hadoop.mapred.LocalJobRunner $Job.run(LocalJobRunner.java:138)java.io.IOException: Job failed! atorg.apache.hadoop.mapred.JobClient.runJob(JobClient.java:1217)at LoadTable1.run(LoadTable1.java:130)at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java: 65) at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java: 79)at LoadTable1.main(LoadTable1.java:107)at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)at sun.reflect.DelegatingMethodAccessorImpl.invoke(UnknownSource)at java.lang.reflect.Method.invoke(Unknown Source) at org.apache.hadoop.util.RunJar.main(RunJar.java:165) at org.apache.hadoop.mapred.JobShell.run(JobShell.java:54)at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java: 65) at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java: 79)at org.apache.hadoop.mapred.JobShell.main(JobShell.java:68) Exception closing file/user/amkhuran/contract_table/_temporary/ _attempt_local_0001_m_000000_0/part-00000java.io.IOException: Filesystem closed atorg.apache.hadoop.hdfs.DFSClient.checkOpen(DFSClient.java:198)atorg.apache.hadoop.hdfs.DFSClient.access$600(DFSClient.java:65)atorg.apache.hadoop.hdfs.DFSClient $DFSOutputStream.closeInternal(DFSClient.java:3084)atorg.apache.hadoop.hdfs.DFSClient $DFSOutputStream.close(DFSClient.java:3053)atorg.apache.hadoop.hdfs.DFSClient $LeaseChecker.close(DFSClient.java:942) at org.apache.hadoop.hdfs.DFSClient.close(DFSClient.java: 210)atorg .apache .hadoop .hdfs.DistributedFileSystem.close(DistributedFileSystem.java: 243)atorg.apache.hadoop.fs.FileSystem $Cache.closeAll(FileSystem.java:1413) at org.apache.hadoop.fs.FileSystem.closeAll(FileSystem.java:236)atorg.apache.hadoop.fs.FileSystem $ClientFinalizer.run(FileSystem.java:221)Here's my code: public class LoadTable1 extends Configured implements Tool { // data destination on hdfs private static final String CONTRACT_OUTPUT_PATH ="contract_table";// The JDBC connection URL and driver implementation classprivate static final String CONNECT_URL = "jdbc:oracle:thin:@dbhost:1521:PSEDEV"; private static final String DB_USER = "user"; private static final String DB_PWD = "pass"; private static final String DATABASE_DRIVER_CLASS = "oracle.jdbc.driver.OracleDriver"; private static final String CONTRACT_INPUT_TABLE = "OSE_EPR_CONTRACT";private static final String [] CONTRACT_INPUT_TABLE_FIELDS = {"PORTFOLIO_NUMBER", "CONTRACT_NUMBER"}; private static final String ORDER_CONTRACT_BY_COL = "CONTRACT_NUMBER";static class ose_epr_contract implements Writable, DBWritable {String CONTRACT_NUMBER; public void readFields(DataInput in) throws IOException { this.CONTRACT_NUMBER = Text.readString(in); } public void write(DataOutput out) throws IOException { Text.writeString(out, this.CONTRACT_NUMBER); }public void readFields(ResultSet in_set) throws SQLException {this.CONTRACT_NUMBER = in_set.getString(1); } @Overridepublic void write(PreparedStatement prep_st) throws SQLException{// TODO Auto-generated method stub } } public static class LoadMapper extends MapReduceBase implements Mapper<LongWritable, ose_epr_contract, Text, NullWritable> { private static final char FIELD_SEPARATOR = 1; public void map(LongWritable arg0, ose_epr_contract arg1,OutputCollector<Text, NullWritable> arg2, Reporter arg3)throws IOException { StringBuilder sb = new StringBuilder(); ; sb.append(arg1.CONTRACT_NUMBER);arg2.collect(new Text (sb.toString()), NullWritable.get());} } public static void main(String[] args) throws Exception { Class.forName("oracle.jdbc.driver.OracleDriver"); int exit = ToolRunner.run(new LoadTable1(), args); } public int run(String[] arg0) throws Exception { JobConf conf = new JobConf(getConf(), LoadTable1.class); conf.setInputFormat(DBInputFormat.class); DBConfiguration.configureDB(conf, DATABASE_DRIVER_CLASS, CONNECT_URL, DB_USER, DB_PWD); DBInputFormat.setInput(conf, ose_epr_contract.class, "select CONTRACT_NUMBER from OSE_EPR_CONTRACT","select COUNT(CONTRACT_NUMBER) from OSE_EPR_CONTRACT");FileOutputFormat.setOutputPath(conf, new Path(CONTRACT_OUTPUT_PATH)); conf.setMapperClass(LoadMapper.class); conf.setNumReduceTasks(0); conf.setOutputKeyClass(Text.class); conf.setOutputValueClass(NullWritable.class); JobClient.runJob(conf); return 0; } } -Amandeep Amandeep Khurana Computer Science Graduate Student University of California, Santa Cruz On Tue, Feb 3, 2009 at 6:51 PM, Kevin Peterson <[email protected]wrote:On Tue, Feb 3, 2009 at 5:49 PM, Amandeep Khurana <[email protected] >wrote:In the setInput(...) function in DBInputFormat, there are two setsof arguments that one can use. 1. public static void *setInput*(JobConfa) In this, do we necessarily have to give all the fieldNames(whicharethecolumn names right?) that the table has, or do we need to specifyonlytheones that we want to extract?You may specify only those columns that you are interested in.b) Do we have to have a orderBy or not necessarily? Does this relate to theprimary key in the table in any ways?Conditions and order by are not necessary. a) Is there any restriction on the kind of queries that this functioncan take in the inputQuery string?I don't think so, but I don't use this method -- I just use thefieldNamesand tableName method.I am facing issues in getting this to work with an Oracle databaseandhave no idea of how to debug it (an email sent earlier). Can anyone give me some inputs on this please?Create a new table that has one column, put about five entries intothattable, then try to get a map job working that outputs the values toatextfile. If that doesn't work, post your code and errors.-- M. Raşit ÖZDAŞ
