First let's rule out the possibility that you are exhausting your memory and having your VM basically DOSed by GC thrashing. Eric goes into some details here: http://wiki.apache.org/cassandra/MemtableThresholds
On Mon, May 25, 2009 at 1:55 PM, Alexandre Linares <[email protected]> wrote: > Following up on this discussion. > > Before going the BMT way, I decided to go ahead with the current APIs as a > first full pass. > > Here's my setup: > - Cassandra cluster (cassandra-0.3-rc) : 3 nodes , vanilla setup except for > <MemtableSizeInMB>512</MemtableSizeInMB> > (b/c I was planning on only having one CF and I wanted as much of it in > memory as possible) > - Hadoop jobs use cassandra-0.3.0-dev.jar > > My table setup: > <Table Name="ClusterF"> > <ColumnFamily ColumnType="Super" ColumnSort="Time" > Name="Composite"/> > </Table> > > I'm pushing data from a small Hadoop cluster for experimentation purposes > (for debugging purposes, one reduce task is doing the batch inserts) > > I'm having a reproducible issue -- some kind of contention/race condition (I > think) when calling send_batch_insert_superColumn. I'm attempting to push > ~90k rows into Cassandra, but after ingesting ~3000 rows with > Cassandra.Client, the client stops ingesting and the Cassandra logs stop > showing activity (no exceptions/errors). > > Due to this, the hadoop task times out. > > Here's what I get from a jstack on the Hadoop task making use of the > Cassandra.Client (after 5 mins of no activity): > > [ > Full thread dump Java HotSpot(TM) Client VM (1.6.0-b105 mixed mode): > > "Attach Listener" daemon prio=10 tid=0x08078400 nid=0x4e2b waiting on > condition [0x00000000..0x00000000] > java.lang.Thread.State: RUNNABLE > > "Thread-33" daemon prio=10 tid=0x0807e000 nid=0x4d1e in Object.wait() > [0x8f2c7000..0x8f2c7eb0] > java.lang.Thread.State: TIMED_WAITING (on object monitor) > at java.lang.Object.wait(Native Method) > - waiting on <0x92ac0d08> (a java.util.LinkedList) > at > org.apache.hadoop.dfs.DFSClient$DFSOutputStream$DataStreamer.run(DFSClient.java:1905) > - locked <0x92ac0d08> (a java.util.LinkedList) > > "Comm thread for attempt_200905242143_0004_r_000000_1" daemon prio=10 > tid=0x081c2c00 nid=0x4c52 waiting on condition [0x8f4fe000..0x8f4fee30] > java.lang.Thread.State: TIMED_WAITING (sleeping) > at java.lang.Thread.sleep(Native Method) > at org.apache.hadoop.mapred.Task$1.run(Task.java:301) > at java.lang.Thread.run(Thread.java:619) > > "org.apache.hadoop.dfs.dfsclient$leasechec...@13f7281" daemon prio=10 > tid=0x081c0800 nid=0x4c51 waiting on condition [0x8f65c000..0x8f65cfb0] > java.lang.Thread.State: TIMED_WAITING (sleeping) > at java.lang.Thread.sleep(Native Method) > at org.apache.hadoop.dfs.DFSClient$LeaseChecker.run(DFSClient.java:791) > at java.lang.Thread.run(Thread.java:619) > > "IPC Client (47) connection to /127.0.0.1:47906 from an unknown user" daemon > prio=10 tid=0x0819c800 nid=0x4c4f in Object.wait() [0x8f6fe000..0x8f6ff0b0] > java.lang.Thread.State: TIMED_WAITING (on object monitor) > at java.lang.Object.wait(Native Method) > - waiting on <0x92a615d8> (a org.apache.hadoop.ipc.Client$Connection) > at org.apache.hadoop.ipc.Client$Connection.waitForWork(Client.java:398) > - locked <0x92a615d8> (a org.apache.hadoop.ipc.Client$Connection) > at org.apache.hadoop.ipc.Client$Connection.run(Client.java:441) > > "Low Memory Detector" daemon prio=10 tid=0x8fc13400 nid=0x4c4d runnable > [0x00000000..0x00000000] > java.lang.Thread.State: RUNNABLE > > "CompilerThread0" daemon prio=10 tid=0x8fc11c00 nid=0x4c4c waiting on > condition [0x00000000..0x8f9febc8] > java.lang.Thread.State: RUNNABLE > > "Signal Dispatcher" daemon prio=10 tid=0x8fc10800 nid=0x4c4b runnable > [0x00000000..0x8fda8b90] > java.lang.Thread.State: RUNNABLE > > "Finalizer" daemon prio=10 tid=0x8fc00800 nid=0x4c4a in Object.wait() > [0x8fdf9000..0x8fdf9e30] > java.lang.Thread.State: WAITING (on object monitor) > at java.lang.Object.wait(Native Method) > - waiting on <0x92a26da0> (a java.lang.ref.ReferenceQueue$Lock) > at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:116) > - locked <0x92a26da0> (a java.lang.ref.ReferenceQueue$Lock) > at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:132) > at java.lang.ref.Finalizer$FinalizerThread.run(Finalizer.java:159) > > "Reference Handler" daemon prio=10 tid=0x080a9800 nid=0x4c49 in > Object.wait() [0x8fe4a000..0x8fe4afb0] > java.lang.Thread.State: WAITING (on object monitor) > at java.lang.Object.wait(Native Method) > - waiting on <0x92a26e30> (a java.lang.ref.Reference$Lock) > at java.lang.Object.wait(Object.java:485) > at java.lang.ref.Reference$ReferenceHandler.run(Reference.java:116) > - locked <0x92a26e30> (a java.lang.ref.Reference$Lock) > > "main" prio=10 tid=0x0805a800 nid=0x4c47 runnable [0xb7fea000..0xb7feb288] > java.lang.Thread.State: RUNNABLE > at java.net.SocketOutputStream.socketWrite0(Native Method) > at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:92) > at java.net.SocketOutputStream.write(SocketOutputStream.java:136) > at java.io.BufferedOutputStream.write(BufferedOutputStream.java:105) > - locked <0x92ac9578> (a java.io.BufferedOutputStream) > at > org.apache.thrift.transport.TIOStreamTransport.write(TIOStreamTransport.java:139) > at > org.apache.thrift.protocol.TBinaryProtocol.writeBinary(TBinaryProtocol.java:184) > at org.apache.cassandra.service.column_t.write(column_t.java:321) > at > org.apache.cassandra.service.superColumn_t.write(superColumn_t.java:291) > at > org.apache.cassandra.service.batch_mutation_super_t.write(batch_mutation_super_t.java:365) > at > org.apache.cassandra.service.Cassandra$batch_insert_superColumn_args.write(Cassandra.java:9776) > at > org.apache.cassandra.service.Cassandra$Client.send_batch_insert_superColumn(Cassandra.java:546) > at > com.yahoo.carmot.client.mapred.CassandraImport$PushReduce.pushDocuments(CassandraImport.java:168) > at > com.yahoo.carmot.client.mapred.CassandraImport$PushReduce.sendOut(CassandraImport.java:146) > at > com.yahoo.carmot.client.mapred.CassandraImport$PushReduce.reduce(CassandraImport.java:127) > at > com.yahoo.carmot.client.mapred.CassandraImport$PushReduce.reduce(CassandraImport.java:1) > at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:318) > at > org.apache.hadoop.mapred.TaskTracker$Child.main(TaskTracker.java:2198) > > ] > > It looks like the client is waiting on a response from Cassandra but never > gets it. Any ideas? I had seen similar behavior in the Cassandra code prior > to the 0.3 release candidate, b/c of a race condition in SelectorManager. > It looks like this was taken care of in 0.3-rc, so I'm not sure what's going > on here. > > Thanks, > -Alex > > ________________________________ > From: Jonathan Ellis <[email protected]> > To: [email protected] > Sent: Thursday, May 21, 2009 9:42:29 AM > Subject: Re: Ingesting from Hadoop to Cassandra > > No, batch APIs are per CF, not per row. > > Several people have asked Avinash for sample code using BinaryMemtable > but to my knowledge nothing ever came of that. > > The high level description of the BMT is that you give it serialized > CFs as values instead of raw columns so it can just sort on key and > write directly to disk. So then you would do something like this: > > Table table = Table.open(mytablename); > ColumnFamilyStore store = table.getColumnFamilyStore(mycfname); > for cf : mydata > store.applyBinary(cf.key, toByteArray(cf)) > > There's no provision for doing this over the network that I know of, > you have to put the right keys on the right nodes manually. > > -Jonathan > > On Thu, May 21, 2009 at 11:27 AM, Alexandre Linares <[email protected]> > wrote: >> Jonathan, >> >> Thanks for your thoughts. >> >> I've done some simple benchmarks with the batch insert apis and was >> looking >> for something slightly more performant. Is there a batch row insert that >> I >> missed? >> >> Any pointers (at all) to anything related to FB's bulk loading or the >> binarymemtable? I've attempted to do this by writing a custom >> IVerbHandler >> for ingestion and interfacing with the MessagingService internally but >> it's >> not that clean. >> >> Thanks again, >> -Alex >> >> ________________________________ >> From: Jonathan Ellis <[email protected]> >> To: [email protected] >> Sent: Thursday, May 21, 2009 7:44:59 AM >> Subject: Re: Ingesting from Hadoop to Cassandra >> >> Have you benchmarked the batch insert apis? If that is "fast enough" >> then it's by far the simplest way to go. >> >> Otherwise you'll have to use the binarymemtable stuff which is >> undocumented and not exposed as a client api (you basically write a >> custom "loader" version of cassandra to use it, I think). FB used >> this for their own bulk loading so it works at some level, but clearly >> there is some assembly required. >> >> -Jonathan >> >> On Thu, May 21, 2009 at 2:28 AM, Alexandre Linares <[email protected]> >> wrote: >>> Hi all, >>> >>> I'm trying to find the most optimal way to ingest my content from Hadoop >>> to >>> Cassandra. Assuming I have figured out the table representation for this >>> content, what is the best way to do go about pushing from my cluster? >>> What >>> Cassandra client batch APIs do you suggest I use to push to Cassandra? >>> I'm >>> sure this is a common pattern, I'm curious to see how it has been >>> implemented. Assume millions of of rows and 1000s of columns. >>> >>> Thanks in advance, >>> -Alex >>> >>> >> >> > >
