When I was doing to write stress test,I found that the throughput is not stable,from 200request/s to 12000request/s,in single thread continue write。 final i found problem is NonBlockHashMap in Memtable ,i use ConcurrentHashMap instead of NonBlockHashMap,and remove synchronized block below,the stress test is stable。 /resolve function in memtable synchronized (keyLocks[Math.abs(key.hashCode() % keyLocks.length)]) { int oldSize = oldCf.size(); int oldObjectCount = oldCf.getColumnCount(); oldCf.resolve(columnFamily); int newSize = oldCf.size(); int newObjectCount = oldCf.getColumnCount(); resolveSize(oldSize, newSize); resolveCount(oldObjectCount, newObjectCount); }
below is my test code,my test server has 2*4=8CPU and 32G Memory // Decompiled by Jad v1.5.8e. Copyright 2001 Pavel Kouznetsov. // Jad home page: http://www.geocities.com/kpdus/jad.html // Decompiler options: packimports(3) // Source File Name: RowApplyTest.java import java.io.IOException; import java.io.PrintStream; import java.util.concurrent.atomic.AtomicLong; import org.apache.cassandra.db.*; public class RowApplyTest { public RowApplyTest() { } public static Column column(String name, String value, long timestamp) { return new Column(name.getBytes(), value.getBytes(), timestamp); } private static void printer() { Thread t = new Thread(new Runnable() { public void run() { do { long current = RowApplyTest._count.get(); System.out.println((new StringBuilder("Rate: ")).append(current - _last).append(" req/s").toString()); _last = current; try { Thread.sleep(1000L); } catch(InterruptedException e) { e.printStackTrace(); } } while(true); } private long _last =0L; }); t.start(); } public static void main(String args[]) throws IOException { printer(); Table table = Table.open("Keyspace1"); ColumnFamilyStore cfStore = table.getColumnFamilyStore("Standard1"); String value = "Agile testing(\u654F\u6377\u6D4B\u8BD5)\u57FA \u672C\u4E0A\u662F\u4F34\u968F\u7740\u654F\u6377\u5F00\u53D1\u7684\u6982 \u5FF5\u6210\u957F\u8D77\u6765\u7684\uFF0C\u4F46\u5728\u53D7\u5173\u6CE8 \u7A0B\u5EA6\u4E0A\uFF0C\u8FDC\u8FDC\u4E0D\u53CA\u654F\u6377\u5F00\u53D1 \u672C\u8EAB\u3002\u81EA\u7136\uFF0C\u5F00\u53D1\u961F\u4F0D\u4ECE\u6570 \u91CF\u548C\u6D3B\u8DC3\u5EA6\u4E0A\u6765\u8BB2\u5927\u4E8E\u6D4B\u8BD5 \u961F\u4F0D\uFF0C\u662F\u5176\u4E2D\u7684\u4E00\u4E2A\u539F\u56E0\uFF1B \u9664\u4E86\u8FD9\u4E2A\u539F\u56E0\u4E4B\u5916\uFF0C\u201C\u654F\u6377 \u6D4B\u8BD5\u7A76\u7ADF\u5982\u4F55\u5728\u9879\u76EE\u4E2D\u53D1\u6325 \u4F5C\u7528\u201D\u8FD9\u4E2A\u95EE\u9898\u53EF\u80FD\u4E5F\u662F\u5BFC \u81F4\u654F\u6377\u6D4B\u8BD5\u6982\u5FF5\u7684\u6D41\u884C\u5EA6\u8FDC \u8FDC\u4E0D\u5982\u654F\u6377\u5F00\u53D1\u7684\u539F\u56E0\u4E4B\u4E00 \u3002\u5173\u4E8E\u654F\u6377\u6D4B\u8BD5\uFF0C\u6211\u80FD\u627E\u5230 \u7684\u8F83\u65E9\u7684..."; do { long i = _count.incrementAndGet(); String key = (new StringBuilder("test")).append(i).toString(); RowMutation rm = new RowMutation("Keyspace1", key); ColumnFamily cf = ColumnFamily.create("Keyspace1", "Standard1"); cf.addColumn(column("name", value, 1L)); rm.add(cf); rm.apply(); } while(true); } private static AtomicLong _count = new AtomicLong(0L); }
// Decompiled by Jad v1.5.8e. Copyright 2001 Pavel Kouznetsov. // Jad home page: http://www.geocities.com/kpdus/jad.html // Decompiler options: packimports(3) // Source File Name: RowApplyTest.java import java.io.IOException; import java.io.PrintStream; import java.util.concurrent.atomic.AtomicLong; import org.apache.cassandra.db.*; public class RowApplyTest { public RowApplyTest() { } public static Column column(String name, String value, long timestamp) { return new Column(name.getBytes(), value.getBytes(), timestamp); } private static void printer() { Thread t = new Thread(new Runnable() { public void run() { do { long current = RowApplyTest._count.get(); System.out.println((new StringBuilder("Rate: ")).append(current - _last).append(" req/s").toString()); _last = current; try { Thread.sleep(1000L); } catch(InterruptedException e) { e.printStackTrace(); } } while(true); } private long _last =0L; }); t.start(); } public static void main(String args[]) throws IOException { printer(); Table table = Table.open("Keyspace1"); ColumnFamilyStore cfStore = table.getColumnFamilyStore("Standard1"); String value = "Agile testing(\u654F\u6377\u6D4B\u8BD5)\u57FA\u672C\u4E0A\u662F\u4F34\u968F\u7740\u654F\u6377\u5F00\u53D1\u7684\u6982\u5FF5\u6210\u957F\u8D77\u6765\u7684\uFF0C\u4F46\u5728\u53D7\u5173\u6CE8\u7A0B\u5EA6\u4E0A\uFF0C\u8FDC\u8FDC\u4E0D\u53CA\u654F\u6377\u5F00\u53D1\u672C\u8EAB\u3002\u81EA\u7136\uFF0C\u5F00\u53D1\u961F\u4F0D\u4ECE\u6570\u91CF\u548C\u6D3B\u8DC3\u5EA6\u4E0A\u6765\u8BB2\u5927\u4E8E\u6D4B\u8BD5\u961F\u4F0D\uFF0C\u662F\u5176\u4E2D\u7684\u4E00\u4E2A\u539F\u56E0\uFF1B\u9664\u4E86\u8FD9\u4E2A\u539F\u56E0\u4E4B\u5916\uFF0C\u201C\u654F\u6377\u6D4B\u8BD5\u7A76\u7ADF\u5982\u4F55\u5728\u9879\u76EE\u4E2D\u53D1\u6325\u4F5C\u7528\u201D\u8FD9\u4E2A\u95EE\u9898\u53EF\u80FD\u4E5F\u662F\u5BFC\u81F4\u654F\u6377\u6D4B\u8BD5\u6982\u5FF5\u7684\u6D41\u884C\u5EA6\u8FDC\u8FDC\u4E0D\u5982\u654F\u6377\u5F00\u53D1\u7684\u539F\u56E0\u4E4B\u4E00\u3002\u5173\u4E8E\u654F\u6377\u6D4B\u8BD5\uFF0C\u6211\u80FD\u627E\u5230\u7684\u8F83\u65E9\u7684..."; do { long i = _count.incrementAndGet(); String key = (new StringBuilder("test")).append(i).toString(); RowMutation rm = new RowMutation("Keyspace1", key); ColumnFamily cf = ColumnFamily.create("Keyspace1", "Standard1"); cf.addColumn(column("name", value, 1L)); rm.add(cf); rm.apply(); } while(true); } private static AtomicLong _count = new AtomicLong(0L); }