Repository: geode
Updated Branches:
  refs/heads/develop 9646a3894 -> 517bb6227


GEODE-2472 changes are made to ensure byte array is completely flushed to oplog 
channel.

The Oplog and OverflowOplog flush(OplogFile olf, ByteBuffer b1, ByteBuffer b2) 
method doesn't check the results of the channel.write() call.

It could so happen partial byte array is written to the channel. The check is 
added to make sure the bytes are completely written to the file channel.


Project: http://git-wip-us.apache.org/repos/asf/geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/geode/commit/517bb622
Tree: http://git-wip-us.apache.org/repos/asf/geode/tree/517bb622
Diff: http://git-wip-us.apache.org/repos/asf/geode/diff/517bb622

Branch: refs/heads/develop
Commit: 517bb6227304545d5c0cad1e9c516a336d83434b
Parents: 9646a38
Author: Anil <aging...@pivotal.io>
Authored: Fri Mar 24 15:41:20 2017 -0700
Committer: Anil <aging...@pivotal.io>
Committed: Fri Mar 24 15:53:54 2017 -0700

----------------------------------------------------------------------
 .../org/apache/geode/internal/cache/Oplog.java  |   5 +-
 .../geode/internal/cache/OverflowOplog.java     |  25 ++++-
 .../geode/internal/cache/OplogFlushTest.java    | 111 ++++++++++++++++++-
 3 files changed, 133 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/geode/blob/517bb622/geode-core/src/main/java/org/apache/geode/internal/cache/Oplog.java
----------------------------------------------------------------------
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/Oplog.java 
b/geode-core/src/main/java/org/apache/geode/internal/cache/Oplog.java
index f6b4023..ca9468d 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/Oplog.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/Oplog.java
@@ -5299,7 +5299,10 @@ public final class Oplog implements CompactableOplog, 
Flushable {
         this.bbArray[0] = b1;
         this.bbArray[1] = b2;
         b1.flip();
-        long flushed = olf.channel.write(this.bbArray);
+        long flushed = 0;
+        do {
+          flushed += olf.channel.write(this.bbArray);
+        } while (b2.hasRemaining());
         this.bbArray[0] = null;
         this.bbArray[1] = null;
         // update bytesFlushed after entire writeBuffer is flushed to fix bug 
41201

http://git-wip-us.apache.org/repos/asf/geode/blob/517bb622/geode-core/src/main/java/org/apache/geode/internal/cache/OverflowOplog.java
----------------------------------------------------------------------
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/OverflowOplog.java 
b/geode-core/src/main/java/org/apache/geode/internal/cache/OverflowOplog.java
index c7f8e0d..78f0c00 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/OverflowOplog.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/OverflowOplog.java
@@ -28,7 +28,6 @@ import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 
 import org.apache.logging.log4j.Logger;
-
 import org.apache.geode.cache.DiskAccessException;
 import org.apache.geode.cache.EntryDestroyedException;
 import org.apache.geode.distributed.OplogCancelledException;
@@ -232,7 +231,6 @@ class OverflowOplog implements CompactableOplog, Flushable {
     this.crf.f = f;
     this.crf.raf = new RandomAccessFile(f, "rw");
     this.crf.writeBuf = allocateWriteBuf(previous);
-    this.bbArray[0] = this.crf.writeBuf;
     preblow();
     logger.info(LocalizedMessage.create(LocalizedStrings.Oplog_CREATE_0_1_2,
         new Object[] {toString(), "crf", this.parent.getName()}));
@@ -256,7 +254,6 @@ class OverflowOplog implements CompactableOplog, Flushable {
     synchronized (this.crf) {
       ByteBuffer result = this.crf.writeBuf;
       this.crf.writeBuf = null;
-      this.bbArray[0] = null;
       return result;
     }
   }
@@ -744,6 +741,18 @@ class OverflowOplog implements CompactableOplog, Flushable 
{
     }
   }
 
+  /**
+   * Method to be used only for testing
+   * 
+   * @param ch Object to replace the channel in the Oplog.crf
+   * @return original channel object
+   */
+  FileChannel testSetCrfChannel(FileChannel ch) {
+    FileChannel chPrev = this.crf.channel;
+    this.crf.channel = ch;
+    return chPrev;
+  }
+
   @Override
   public final void flush(ByteBuffer b1, ByteBuffer b2) throws IOException {
     final OplogFile olf = this.crf;
@@ -752,10 +761,14 @@ class OverflowOplog implements CompactableOplog, 
Flushable {
         return;
       }
       try {
-        assert b1 == olf.writeBuf;
-        b1.flip();
+        this.bbArray[0] = b1;
         this.bbArray[1] = b2;
-        long flushed = olf.channel.write(this.bbArray);
+        b1.flip();
+        long flushed = 0;
+        do {
+          flushed += olf.channel.write(this.bbArray);
+        } while (b2.hasRemaining());
+        this.bbArray[0] = null;
         this.bbArray[1] = null;
         // update bytesFlushed after entire writeBuffer is flushed to fix bug 
41201
         olf.bytesFlushed += flushed;

http://git-wip-us.apache.org/repos/asf/geode/blob/517bb622/geode-core/src/test/java/org/apache/geode/internal/cache/OplogFlushTest.java
----------------------------------------------------------------------
diff --git 
a/geode-core/src/test/java/org/apache/geode/internal/cache/OplogFlushTest.java 
b/geode-core/src/test/java/org/apache/geode/internal/cache/OplogFlushTest.java
index d24e66d..7f4f056 100644
--- 
a/geode-core/src/test/java/org/apache/geode/internal/cache/OplogFlushTest.java
+++ 
b/geode-core/src/test/java/org/apache/geode/internal/cache/OplogFlushTest.java
@@ -23,6 +23,9 @@ import static org.mockito.Mockito.spy;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.util.Arrays;
+import java.util.List;
 
 import org.junit.Rule;
 import org.junit.Test;
@@ -31,8 +34,8 @@ import org.junit.rules.ExpectedException;
 import org.junit.rules.TestName;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
-
 import org.apache.geode.cache.DiskAccessException;
+import org.apache.geode.cache.Scope;
 import org.apache.geode.internal.cache.persistence.UninterruptibleFileChannel;
 import org.apache.geode.test.junit.categories.IntegrationTest;
 
@@ -82,6 +85,32 @@ public class OplogFlushTest extends DiskRegionTestingBase {
     }
   }
 
+  class FakeChannelWriteArrayBB implements Answer<Integer> {
+    @Override
+    public Integer answer(InvocationOnMock invocation) throws Throwable {
+      System.out.println("### in FakeChannelWriteArrayBB.answer :");
+      return fakeWriteArrayBB(bbArray);
+    }
+  }
+
+  /**
+   * This method tries to write half of the byte buffer to the channel.
+   */
+  private int fakeWriteArrayBB(ByteBuffer[] bbArray) throws IOException {
+    nFakeChannelWrites++;
+    for (ByteBuffer b : bbArray) {
+      int numFakeWrite = b.limit() / 2;
+      if (b.position() <= 0) {
+        b.position(numFakeWrite);
+        return numFakeWrite;
+      } else if (b.position() == numFakeWrite) {
+        b.position(b.limit());
+        return b.limit() - numFakeWrite;
+      }
+    }
+    return 0;
+  }
+
   private void doChannelFlushWithFailures(Oplog[] oplogs, int numFailures) 
throws IOException {
     nFakeChannelWrites = numFailures;
     ol = oplogs[0];
@@ -166,4 +195,84 @@ public class OplogFlushTest extends DiskRegionTestingBase {
 
     doChannelFlushWithFailures(oplogs, 6 /* exceeds the retry limit in Oplog 
*/);
   }
+
+  private void doPartialChannelByteArrayFlushForOpLog(Oplog[] oplogs) throws 
IOException {
+    ol = oplogs[0];
+    ch = ol.getFileChannel();
+    spyCh = spy(ch);
+    ol.testSetCrfChannel(spyCh);
+
+    byte[] entry1 = {0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 
17, 18, 19};
+    byte[] entry2 = {100, 101, 102, 103, 104, 105, 106, 107, 108, 109, 110, 
111, 112, 113, 114, 115,
+        116, 117, 118, 119};
+
+    bbArray[0] = bb1 = ByteBuffer.allocate(entry1.length).put(entry1);
+    bbArray[1] = bb2 = ByteBuffer.allocate(entry2.length).put(entry2);
+
+    try {
+      // Set fake channel, that pretends to write partial data.
+      doAnswer(new FakeChannelWriteArrayBB()).when(spyCh).write(bbArray);
+
+      bb2.flip();
+      ol.flush(bb1, bb2);
+      assertEquals("Incomplete flush calls.", 4, nFakeChannelWrites);
+
+    } finally {
+      region.destroyRegion();
+    }
+  }
+
+  private void doPartialChannelByteArrayFlushForOverflowOpLog(OverflowOplog 
oplog)
+      throws IOException {
+    OverflowOplog ol = oplog;
+    FileChannel ch = ol.getFileChannel();
+    FileChannel spyCh = spy(ch);
+    ol.testSetCrfChannel(spyCh);
+
+    byte[] entry1 = {0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 
17, 18, 19};
+    byte[] entry2 = {100, 101, 102, 103, 104, 105, 106, 107, 108, 109, 110, 
111, 112, 113, 114, 115,
+        116, 117, 118, 119};
+
+    bbArray[0] = bb1 = ByteBuffer.allocate(entry1.length).put(entry1);
+    bbArray[1] = bb2 = ByteBuffer.allocate(entry2.length).put(entry2);
+
+    try {
+      // Set fake channel, that pretends to write partial data.
+      doAnswer(new FakeChannelWriteArrayBB()).when(spyCh).write(bbArray);
+
+      bb2.flip();
+      ol.flush(bb1, bb2);
+      assertEquals("Incomplete flush calls.", 4, nFakeChannelWrites);
+
+    } finally {
+      region.destroyRegion();
+    }
+  }
+
+  @Test
+  public void testOplogByteArrayFlush() throws Exception {
+    region = DiskRegionHelperFactory.getSyncPersistOnlyRegion(cache, null, 
Scope.LOCAL);
+    DiskRegion dr = ((LocalRegion) region).getDiskRegion();
+    Oplog[] oplogs = dr.getDiskStore().persistentOplogs.getAllOplogs();
+    assertNotNull("Unexpected null Oplog[] for " + dr.getName(), oplogs);
+    assertNotNull("Unexpected null Oplog", oplogs[0]);
+
+    doPartialChannelByteArrayFlushForOpLog(oplogs);
+  }
+
+  @Test
+  public void testOverflowOplogByteArrayFlush() throws Exception {
+    DiskRegionProperties props = new DiskRegionProperties();
+    props.setOverFlowCapacity(1);
+    region = DiskRegionHelperFactory.getSyncOverFlowOnlyRegion(cache, props);
+    region.put("K1", "v1");
+    region.put("K2", "v2");
+
+    DiskRegion dr = ((LocalRegion) region).getDiskRegion();
+    OverflowOplog oplog = 
dr.getDiskStore().overflowOplogs.getActiveOverflowOplog();
+    assertNotNull("Unexpected null Oplog", oplog);
+
+    doPartialChannelByteArrayFlushForOverflowOpLog(oplog);
+  }
+
 }

Reply via email to