Fixed issue GORA-443
Project: http://git-wip-us.apache.org/repos/asf/gora/repo Commit: http://git-wip-us.apache.org/repos/asf/gora/commit/b0cd1950 Tree: http://git-wip-us.apache.org/repos/asf/gora/tree/b0cd1950 Diff: http://git-wip-us.apache.org/repos/asf/gora/diff/b0cd1950 Branch: refs/heads/master Commit: b0cd1950c978181890213e1c85e437e44421405e Parents: 5664dc6 Author: Kiyonari Harigae <[email protected]> Authored: Wed Feb 22 18:53:14 2017 +0900 Committer: Kiyonari Harigae <[email protected]> Committed: Wed Feb 22 18:53:14 2017 +0900 ---------------------------------------------------------------------- .../main/java/org/apache/gora/hbase/store/HBaseStore.java | 10 ++++++++-- .../org/apache/gora/hbase/store/HBaseTableConnection.java | 5 +++-- 2 files changed, 11 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/gora/blob/b0cd1950/gora-hbase/src/main/java/org/apache/gora/hbase/store/HBaseStore.java ---------------------------------------------------------------------- diff --git a/gora-hbase/src/main/java/org/apache/gora/hbase/store/HBaseStore.java b/gora-hbase/src/main/java/org/apache/gora/hbase/store/HBaseStore.java index 7d9f527..8176c91 100644 --- a/gora-hbase/src/main/java/org/apache/gora/hbase/store/HBaseStore.java +++ b/gora-hbase/src/main/java/org/apache/gora/hbase/store/HBaseStore.java @@ -88,6 +88,9 @@ implements Configurable { private static final String SCANNER_CACHING_PROPERTIES_KEY = "scanner.caching" ; private static final int SCANNER_CACHING_PROPERTIES_DEFAULT = 0 ; + + private static final int PUTS_AND_DELETES_PUT_TS_OFFSET = 1; + private static final int PUTS_AND_DELETES_DELETE_TS_OFFSET = 2; private volatile Admin admin; @@ -236,8 +239,11 @@ implements Configurable { try { Schema schema = persistent.getSchema(); byte[] keyRaw = toBytes(key); - Put put = new Put(keyRaw); - Delete delete = new Delete(keyRaw); + long timeStamp = System.currentTimeMillis(); + // Guarantee Put after Delete + Put put = new Put(keyRaw, timeStamp - PUTS_AND_DELETES_PUT_TS_OFFSET); + Delete delete = new Delete(keyRaw, timeStamp - PUTS_AND_DELETES_DELETE_TS_OFFSET); + List<Field> fields = schema.getFields(); for (int i = 0; i < fields.size(); i++) { if (!persistent.isDirty(i)) { http://git-wip-us.apache.org/repos/asf/gora/blob/b0cd1950/gora-hbase/src/main/java/org/apache/gora/hbase/store/HBaseTableConnection.java ---------------------------------------------------------------------- diff --git a/gora-hbase/src/main/java/org/apache/gora/hbase/store/HBaseTableConnection.java b/gora-hbase/src/main/java/org/apache/gora/hbase/store/HBaseTableConnection.java index c3c7009..3901fd4 100644 --- a/gora-hbase/src/main/java/org/apache/gora/hbase/store/HBaseTableConnection.java +++ b/gora-hbase/src/main/java/org/apache/gora/hbase/store/HBaseTableConnection.java @@ -110,11 +110,12 @@ public class HBaseTableConnection { public void flushCommits() throws IOException { BufferedMutator bufMutator = connection.getBufferedMutator(this.tableName); for (ConcurrentLinkedQueue<Mutation> buffer : bPool) { - for (Mutation m: buffer) { + while (!buffer.isEmpty()) { + Mutation m = buffer.poll(); bufMutator.mutate(m); - bufMutator.flush(); } } + bufMutator.flush(); bufMutator.close(); }
