This is an automated email from the ASF dual-hosted git repository.

dschneider pushed a commit to branch feature/GEODE-3940
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/feature/GEODE-3940 by this 
push:
     new 96e327a  Added FlushToDiskOperation
96e327a is described below

commit 96e327ac297914be6f5db1ecc988c51cb8a18399
Author: Darrel Schneider <[email protected]>
AuthorDate: Mon Nov 13 15:42:02 2017 -0800

    Added FlushToDiskOperation
---
 .../cache/backup/BackupDataStoreHelper.java        |   3 +-
 .../geode/internal/cache/backup/FlushToDisk.java   |  33 +++++
 .../internal/cache/backup/FlushToDiskFactory.java  |  44 +++++++
 ...oDiskRequest.java => FlushToDiskOperation.java} |  70 ++++------
 .../internal/cache/backup/FlushToDiskRequest.java  |  66 ++--------
 .../cache/backup/FlushToDiskFactoryTest.java       |  83 ++++++++++++
 .../cache/backup/FlushToDiskOperationTest.java     | 121 +++++++++++++++++
 .../cache/backup/FlushToDiskRequestTest.java       | 146 ++++-----------------
 .../cache/backup/PrepareBackupRequestTest.java     |   1 -
 9 files changed, 349 insertions(+), 218 deletions(-)

diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/backup/BackupDataStoreHelper.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/backup/BackupDataStoreHelper.java
index 455111d..d5d5386 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/backup/BackupDataStoreHelper.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/backup/BackupDataStoreHelper.java
@@ -29,13 +29,14 @@ public class BackupDataStoreHelper {
   public static final String LOCK_SERVICE_NAME = 
BackupDataStoreHelper.class.getSimpleName();
   private static final String LOCK_NAME = LOCK_SERVICE_NAME + "_token";
   private static final Object LOCK_SYNC = new Object();
+  private static final FlushToDiskFactory flushToDiskFactory = new 
FlushToDiskFactory();
   private static final PrepareBackupFactory prepareBackupFactory = new 
PrepareBackupFactory();
   private static final FinishBackupFactory finishBackupFactory = new 
FinishBackupFactory();
 
   @SuppressWarnings("rawtypes")
   public static BackupDataStoreResult backupAllMembers(DM dm, Set recipients, 
File targetDir,
       File baselineDir) {
-    FlushToDiskRequest.send(dm, recipients);
+    new FlushToDiskOperation(dm, dm.getId(), dm.getCache(), recipients, 
flushToDiskFactory).send();
 
     boolean abort = true;
     Map<DistributedMember, Set<PersistentID>> successfulMembers;
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/backup/FlushToDisk.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/backup/FlushToDisk.java
new file mode 100644
index 0000000..8149d34
--- /dev/null
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/backup/FlushToDisk.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache.backup;
+
+import org.apache.geode.cache.DiskStore;
+import org.apache.geode.internal.cache.InternalCache;
+
+class FlushToDisk {
+
+  private final InternalCache cache;
+
+  FlushToDisk(InternalCache cache) {
+    this.cache = cache;
+  }
+
+  void run() {
+    if (cache != null) {
+      cache.listDiskStoresIncludingRegionOwned().forEach(DiskStore::flush);
+    }
+  }
+}
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/backup/FlushToDiskFactory.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/backup/FlushToDiskFactory.java
new file mode 100644
index 0000000..9d91997
--- /dev/null
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/backup/FlushToDiskFactory.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache.backup;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.geode.cache.persistence.PersistentID;
+import org.apache.geode.distributed.internal.DM;
+import 
org.apache.geode.distributed.internal.membership.InternalDistributedMember;
+import org.apache.geode.internal.cache.InternalCache;
+
+class FlushToDiskFactory {
+
+  FlushToDiskProcessor createReplyProcessor(DM dm, 
Set<InternalDistributedMember> recipients) {
+    return new FlushToDiskProcessor(dm, recipients);
+  }
+
+  FlushToDiskRequest createRequest(InternalDistributedMember sender,
+      Set<InternalDistributedMember> recipients, int processorId) {
+    return new FlushToDiskRequest(sender, recipients, processorId, this);
+  }
+
+  FlushToDisk createFlushToDisk(InternalCache cache) {
+    return new FlushToDisk(cache);
+  }
+
+  FlushToDiskResponse createResponse(InternalDistributedMember sender) {
+    return new FlushToDiskResponse(sender);
+  }
+
+}
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/backup/FlushToDiskRequest.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/backup/FlushToDiskOperation.java
similarity index 55%
copy from 
geode-core/src/main/java/org/apache/geode/internal/cache/backup/FlushToDiskRequest.java
copy to 
geode-core/src/main/java/org/apache/geode/internal/cache/backup/FlushToDiskOperation.java
index 8910527..afeda08 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/backup/FlushToDiskRequest.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/backup/FlushToDiskOperation.java
@@ -21,7 +21,9 @@ import org.apache.logging.log4j.Logger;
 import org.apache.geode.CancelException;
 import org.apache.geode.cache.DiskStore;
 import org.apache.geode.distributed.internal.DM;
+import org.apache.geode.distributed.internal.DistributionMessage;
 import org.apache.geode.distributed.internal.ReplyException;
+import org.apache.geode.distributed.internal.ReplyProcessor21;
 import 
org.apache.geode.distributed.internal.membership.InternalDistributedMember;
 import org.apache.geode.internal.admin.remote.AdminResponse;
 import org.apache.geode.internal.admin.remote.CliLegacyMessage;
@@ -29,43 +31,34 @@ import org.apache.geode.internal.cache.InternalCache;
 import org.apache.geode.internal.logging.LogService;
 
 /**
- * A request to from an admin VM to all non admin members to start a backup. 
In the prepare phase of
- * the backup, the members will suspend bucket destroys to make sure buckets 
aren't missed during
+ * A Operation to from an admin VM to all non admin members to start a backup. 
In the prepare phase
+ * of the backup, the members will suspend bucket destroys to make sure 
buckets aren't missed during
  * the backup.
  */
-public class FlushToDiskRequest extends CliLegacyMessage {
+public class FlushToDiskOperation {
   private static final Logger logger = LogService.getLogger();
 
   private final DM dm;
-  private final FlushToDiskProcessor replyProcessor;
-
-  public FlushToDiskRequest() {
-    super();
-    this.dm = null;
-    this.replyProcessor = null;
-  }
-
-  private FlushToDiskRequest(DM dm, Set<InternalDistributedMember> recipients) 
{
-    this(dm, recipients, new FlushToDiskProcessor(dm, recipients));
-  }
-
-  FlushToDiskRequest(DM dm, Set<InternalDistributedMember> recipients,
-      FlushToDiskProcessor replyProcessor) {
+  private final InternalDistributedMember member;
+  private final InternalCache cache;
+  private final Set<InternalDistributedMember> recipients;
+  private final FlushToDiskFactory flushToDiskFactory;
+
+  FlushToDiskOperation(DM dm, InternalDistributedMember member, InternalCache 
cache,
+      Set<InternalDistributedMember> recipients, FlushToDiskFactory 
flushToDiskFactory) {
+    this.flushToDiskFactory = flushToDiskFactory;
     this.dm = dm;
-    setRecipients(recipients);
-    this.replyProcessor = replyProcessor;
-    this.msgId = this.replyProcessor.getProcessorId();
-  }
-
-  public static void send(DM dm, Set recipients) {
-    FlushToDiskRequest request = new FlushToDiskRequest(dm, recipients);
-    request.send();
+    this.member = member;
+    this.recipients = recipients;
+    this.cache = cache;
   }
 
   void send() {
-    dm.putOutgoing(this);
+    ReplyProcessor21 replyProcessor = createReplyProcessor();
+
+    dm.putOutgoing(createDistributionMessage(replyProcessor));
 
-    AdminResponse response = createResponse(dm);
+    processLocally();
 
     try {
       replyProcessor.waitForReplies();
@@ -74,26 +67,21 @@ public class FlushToDiskRequest extends CliLegacyMessage {
         throw e;
       }
     } catch (InterruptedException e) {
-      logger.warn(e);
+      logger.warn(e.getMessage(), e);
     }
-
-    response.setSender(dm.getDistributionManagerId());
-    replyProcessor.process(response, false);
   }
 
-  @Override
-  protected AdminResponse createResponse(DM dm) {
-    InternalCache cache = dm.getCache();
-    if (cache != null) {
-      cache.listDiskStoresIncludingRegionOwned().forEach(DiskStore::flush);
-    }
+  private ReplyProcessor21 createReplyProcessor() {
+    return this.flushToDiskFactory.createReplyProcessor(dm, recipients);
+  }
 
-    return new FlushToDiskResponse(getSender());
+  private DistributionMessage createDistributionMessage(ReplyProcessor21 
replyProcessor) {
+    return this.flushToDiskFactory.createRequest(member, recipients,
+        replyProcessor.getProcessorId());
   }
 
-  @Override
-  public int getDSFID() {
-    return FLUSH_TO_DISK_REQUEST;
+  private void processLocally() {
+    flushToDiskFactory.createFlushToDisk(cache).run();
   }
 
 }
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/backup/FlushToDiskRequest.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/backup/FlushToDiskRequest.java
index 8910527..f421447 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/backup/FlushToDiskRequest.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/backup/FlushToDiskRequest.java
@@ -16,17 +16,10 @@ package org.apache.geode.internal.cache.backup;
 
 import java.util.Set;
 
-import org.apache.logging.log4j.Logger;
-
-import org.apache.geode.CancelException;
-import org.apache.geode.cache.DiskStore;
 import org.apache.geode.distributed.internal.DM;
-import org.apache.geode.distributed.internal.ReplyException;
 import 
org.apache.geode.distributed.internal.membership.InternalDistributedMember;
 import org.apache.geode.internal.admin.remote.AdminResponse;
 import org.apache.geode.internal.admin.remote.CliLegacyMessage;
-import org.apache.geode.internal.cache.InternalCache;
-import org.apache.geode.internal.logging.LogService;
 
 /**
  * A request to from an admin VM to all non admin members to start a backup. 
In the prepare phase of
@@ -34,66 +27,31 @@ import org.apache.geode.internal.logging.LogService;
  * the backup.
  */
 public class FlushToDiskRequest extends CliLegacyMessage {
-  private static final Logger logger = LogService.getLogger();
 
-  private final DM dm;
-  private final FlushToDiskProcessor replyProcessor;
+  private final FlushToDiskFactory flushToDiskFactory;
 
   public FlushToDiskRequest() {
     super();
-    this.dm = null;
-    this.replyProcessor = null;
-  }
-
-  private FlushToDiskRequest(DM dm, Set<InternalDistributedMember> recipients) 
{
-    this(dm, recipients, new FlushToDiskProcessor(dm, recipients));
+    this.flushToDiskFactory = new FlushToDiskFactory();
   }
 
-  FlushToDiskRequest(DM dm, Set<InternalDistributedMember> recipients,
-      FlushToDiskProcessor replyProcessor) {
-    this.dm = dm;
+  FlushToDiskRequest(InternalDistributedMember sender, 
Set<InternalDistributedMember> recipients,
+      int processorId, FlushToDiskFactory flushToDiskFactory) {
+    this.setSender(sender);
     setRecipients(recipients);
-    this.replyProcessor = replyProcessor;
-    this.msgId = this.replyProcessor.getProcessorId();
-  }
-
-  public static void send(DM dm, Set recipients) {
-    FlushToDiskRequest request = new FlushToDiskRequest(dm, recipients);
-    request.send();
-  }
-
-  void send() {
-    dm.putOutgoing(this);
-
-    AdminResponse response = createResponse(dm);
-
-    try {
-      replyProcessor.waitForReplies();
-    } catch (ReplyException e) {
-      if (!(e.getCause() instanceof CancelException)) {
-        throw e;
-      }
-    } catch (InterruptedException e) {
-      logger.warn(e);
-    }
-
-    response.setSender(dm.getDistributionManagerId());
-    replyProcessor.process(response, false);
+    this.msgId = processorId;
+    this.flushToDiskFactory = flushToDiskFactory;
   }
 
   @Override
-  protected AdminResponse createResponse(DM dm) {
-    InternalCache cache = dm.getCache();
-    if (cache != null) {
-      cache.listDiskStoresIncludingRegionOwned().forEach(DiskStore::flush);
-    }
-
-    return new FlushToDiskResponse(getSender());
+  public int getDSFID() {
+    return FLUSH_TO_DISK_REQUEST;
   }
 
   @Override
-  public int getDSFID() {
-    return FLUSH_TO_DISK_REQUEST;
+  protected AdminResponse createResponse(DM dm) {
+    flushToDiskFactory.createFlushToDisk(dm.getCache()).run();
+    return flushToDiskFactory.createResponse(getSender());
   }
 
 }
diff --git 
a/geode-core/src/test/java/org/apache/geode/internal/cache/backup/FlushToDiskFactoryTest.java
 
b/geode-core/src/test/java/org/apache/geode/internal/cache/backup/FlushToDiskFactoryTest.java
new file mode 100644
index 0000000..cfb62ab
--- /dev/null
+++ 
b/geode-core/src/test/java/org/apache/geode/internal/cache/backup/FlushToDiskFactoryTest.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache.backup;
+
+import static org.assertj.core.api.Assertions.*;
+import static org.mockito.Mockito.*;
+
+import java.io.File;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import org.apache.geode.CancelCriterion;
+import org.apache.geode.cache.persistence.PersistentID;
+import org.apache.geode.distributed.internal.DM;
+import org.apache.geode.distributed.internal.InternalDistributedSystem;
+import 
org.apache.geode.distributed.internal.membership.InternalDistributedMember;
+import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.test.junit.categories.UnitTest;
+
+@Category(UnitTest.class)
+public class FlushToDiskFactoryTest {
+
+  private FlushToDiskFactory flushToDiskFactory;
+
+  private DM dm;
+  private InternalDistributedMember sender;
+  private Set<InternalDistributedMember> recipients;
+  private InternalDistributedMember member;
+  private InternalCache cache;
+
+  @Before
+  public void setUp() throws Exception {
+    dm = mock(DM.class);
+    sender = mock(InternalDistributedMember.class);
+    member = mock(InternalDistributedMember.class);
+    cache = mock(InternalCache.class);
+
+    recipients = new HashSet<>();
+
+    when(dm.getSystem()).thenReturn(mock(InternalDistributedSystem.class));
+    when(dm.getCancelCriterion()).thenReturn(mock(CancelCriterion.class));
+
+    flushToDiskFactory = new FlushToDiskFactory();
+  }
+
+  @Test
+  public void createReplyProcessorReturnsFlushToDiskReplyProcessor() throws 
Exception {
+    assertThat(flushToDiskFactory.createReplyProcessor(dm, recipients))
+        .isInstanceOf(FlushToDiskProcessor.class);
+  }
+
+  @Test
+  public void createRequestReturnsFlushToDiskRequest() throws Exception {
+    assertThat(flushToDiskFactory.createRequest(sender, recipients, 1))
+        .isInstanceOf(FlushToDiskRequest.class);
+  }
+
+  @Test
+  public void createFlushToDiskReturnsFlushToDisk() throws Exception {
+    
assertThat(flushToDiskFactory.createFlushToDisk(cache)).isInstanceOf(FlushToDisk.class);
+  }
+
+  @Test
+  public void createResponseReturnsFlushToDiskResponse() {
+    
assertThat(flushToDiskFactory.createResponse(member)).isInstanceOf(FlushToDiskResponse.class);
+  }
+}
diff --git 
a/geode-core/src/test/java/org/apache/geode/internal/cache/backup/FlushToDiskOperationTest.java
 
b/geode-core/src/test/java/org/apache/geode/internal/cache/backup/FlushToDiskOperationTest.java
new file mode 100644
index 0000000..a79b43f
--- /dev/null
+++ 
b/geode-core/src/test/java/org/apache/geode/internal/cache/backup/FlushToDiskOperationTest.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache.backup;
+
+import static org.mockito.Mockito.*;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.mockito.InOrder;
+
+import org.apache.geode.cache.CacheClosedException;
+import org.apache.geode.distributed.internal.DM;
+import org.apache.geode.distributed.internal.ReplyException;
+import 
org.apache.geode.distributed.internal.membership.InternalDistributedMember;
+import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.test.junit.categories.UnitTest;
+
+@Category(UnitTest.class)
+public class FlushToDiskOperationTest {
+
+  private DM dm;
+  private InternalCache cache;
+  private Set<InternalDistributedMember> recipients;
+
+  private InternalDistributedMember sender;
+
+  private FlushToDiskFactory flushToDiskFactory;
+  private FlushToDiskProcessor flushToDiskReplyProcessor;
+  private FlushToDiskRequest flushToDiskRequest;
+  private FlushToDisk flushToDisk;
+
+  private FlushToDiskOperation flushToDiskOperation;
+
+  @Before
+  public void setUp() throws Exception {
+    dm = mock(DM.class);
+    cache = mock(InternalCache.class);
+
+    flushToDiskReplyProcessor = mock(FlushToDiskProcessor.class);
+    flushToDiskRequest = mock(FlushToDiskRequest.class);
+    flushToDisk = mock(FlushToDisk.class);
+
+    flushToDiskFactory = mock(FlushToDiskFactory.class);
+
+    sender = mock(InternalDistributedMember.class, "sender");
+    recipients = new HashSet<>();
+
+    flushToDiskOperation =
+        new FlushToDiskOperation(dm, sender, cache, recipients, 
flushToDiskFactory);
+
+    when(flushToDiskReplyProcessor.getProcessorId()).thenReturn(42);
+
+    when(flushToDiskFactory.createReplyProcessor(eq(dm), eq(recipients)))
+        .thenReturn(flushToDiskReplyProcessor);
+    when(flushToDiskFactory.createRequest(eq(sender), eq(recipients), eq(42)))
+        .thenReturn(flushToDiskRequest);
+    
when(flushToDiskFactory.createFlushToDisk(eq(cache))).thenReturn(flushToDisk);
+  }
+
+  @Test
+  public void sendShouldSendFlushToDiskMessage() throws Exception {
+    flushToDiskOperation.send();
+
+    verify(dm, times(1)).putOutgoing(flushToDiskRequest);
+  }
+
+  @Test
+  public void sendShouldHandleCancelExceptionFromWaitForReplies() throws 
Exception {
+    ReplyException replyException =
+        new ReplyException("expected exception", new 
CacheClosedException("expected exception"));
+    doThrow(replyException).when(flushToDiskReplyProcessor).waitForReplies();
+    flushToDiskOperation.send();
+  }
+
+  @Test
+  public void sendShouldHandleInterruptedExceptionFromWaitForReplies() throws 
Exception {
+    doThrow(new InterruptedException("expected 
exception")).when(flushToDiskReplyProcessor)
+        .waitForReplies();
+    flushToDiskOperation.send();
+  }
+
+  @Test(expected = ReplyException.class)
+  public void sendShouldThrowReplyExceptionWithNoCauseFromWaitForReplies() 
throws Exception {
+    doThrow(new ReplyException("expected 
exception")).when(flushToDiskReplyProcessor)
+        .waitForReplies();
+    flushToDiskOperation.send();
+  }
+
+  @Test(expected = ReplyException.class)
+  public void 
sendShouldThrowReplyExceptionWithCauseThatIsNotACancelFromWaitForReplies()
+      throws Exception {
+    doThrow(new ReplyException("expected exception", new 
RuntimeException("expected")))
+        .when(flushToDiskReplyProcessor).waitForReplies();
+    flushToDiskOperation.send();
+  }
+
+  @Test
+  public void sendShouldProcessLocallyBeforeWaitingForReplies() throws 
Exception {
+    InOrder inOrder = inOrder(flushToDisk, flushToDiskReplyProcessor);
+    flushToDiskOperation.send();
+
+    inOrder.verify(flushToDisk, times(1)).run();
+    inOrder.verify(flushToDiskReplyProcessor, times(1)).waitForReplies();
+  }
+}
diff --git 
a/geode-core/src/test/java/org/apache/geode/internal/cache/backup/FlushToDiskRequestTest.java
 
b/geode-core/src/test/java/org/apache/geode/internal/cache/backup/FlushToDiskRequestTest.java
index f1ea67d..113bb70 100644
--- 
a/geode-core/src/test/java/org/apache/geode/internal/cache/backup/FlushToDiskRequestTest.java
+++ 
b/geode-core/src/test/java/org/apache/geode/internal/cache/backup/FlushToDiskRequestTest.java
@@ -14,33 +14,18 @@
  */
 package org.apache.geode.internal.cache.backup;
 
-import static org.assertj.core.api.Assertions.assertThat;
-import static org.assertj.core.api.Assertions.assertThatThrownBy;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.eq;
-import static org.mockito.Mockito.doThrow;
-import static org.mockito.Mockito.inOrder;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-import java.util.ArrayList;
-import java.util.Collection;
+import static org.assertj.core.api.Assertions.*;
+import static org.mockito.Mockito.*;
+
 import java.util.HashSet;
 import java.util.Set;
 
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
-import org.mockito.InOrder;
 
-import org.apache.geode.cache.CacheClosedException;
-import org.apache.geode.cache.DiskStore;
 import org.apache.geode.distributed.internal.DM;
-import org.apache.geode.distributed.internal.ReplyException;
 import 
org.apache.geode.distributed.internal.membership.InternalDistributedMember;
-import org.apache.geode.internal.admin.remote.AdminResponse;
 import org.apache.geode.internal.cache.InternalCache;
 import org.apache.geode.test.junit.categories.UnitTest;
 
@@ -49,130 +34,49 @@ public class FlushToDiskRequestTest {
 
   private FlushToDiskRequest flushToDiskRequest;
 
-  private FlushToDiskProcessor replyProcessor;
   private DM dm;
-  private InternalCache cache;
-
-  private DiskStore diskStore1;
-  private DiskStore diskStore2;
-  private Collection<DiskStore> diskStoreCollection;
-
-  private InternalDistributedMember localMember;
-  private InternalDistributedMember member1;
-  private InternalDistributedMember member2;
-
   private Set<InternalDistributedMember> recipients;
+  private int msgId;
+  private FlushToDiskFactory flushToDiskFactory;
+  private InternalDistributedMember sender;
+  private InternalCache cache;
+  private FlushToDisk flushToDisk;
 
   @Before
   public void setUp() throws Exception {
-    // mocks here
-    replyProcessor = mock(FlushToDiskProcessor.class);
     dm = mock(DM.class);
+    sender = mock(InternalDistributedMember.class);
     cache = mock(InternalCache.class);
-    diskStore1 = mock(DiskStore.class);
-    diskStore2 = mock(DiskStore.class);
-
-    diskStoreCollection = new ArrayList<>();
-    diskStoreCollection.add(diskStore1);
-    diskStoreCollection.add(diskStore2);
-
-    when(dm.getCache()).thenReturn(cache);
-    when(dm.getDistributionManagerId()).thenReturn(localMember);
-    
when(cache.listDiskStoresIncludingRegionOwned()).thenReturn(diskStoreCollection);
-
-    localMember = mock(InternalDistributedMember.class);
-    member1 = mock(InternalDistributedMember.class);
-    member2 = mock(InternalDistributedMember.class);
+    flushToDiskFactory = mock(FlushToDiskFactory.class);
+    flushToDisk = mock(FlushToDisk.class);
 
+    msgId = 42;
     recipients = new HashSet<>();
-    recipients.add(member1);
-    recipients.add(member2);
-
-    flushToDiskRequest = new FlushToDiskRequest(dm, recipients, 
replyProcessor);
-  }
-
-  @Test
-  public void getRecipientsReturnsRecipientMembers() throws Exception {
-    
assertThat(flushToDiskRequest.getRecipients()).hasSize(2).contains(member1, 
member2);
-  }
-
-  @Test
-  public void getRecipientsDoesNotIncludeNull() throws Exception {
-    InternalDistributedMember nullMember = null;
-
-    assertThat(flushToDiskRequest.getRecipients()).doesNotContain(nullMember);
-  }
-
-  @Test
-  public void sendShouldUseDMToSendMessage() throws Exception {
-    flushToDiskRequest.send();
-
-    verify(dm, times(1)).putOutgoing(flushToDiskRequest);
-  }
-
-  @Test
-  public void sendShouldWaitForRepliesFromRecipients() throws Exception {
-    flushToDiskRequest.send();
-
-    verify(replyProcessor, times(1)).waitForReplies();
-  }
-
-  @Test
-  public void sendShouldInvokeProcessLocally() throws Exception {
-    flushToDiskRequest.send();
-
-    verify(replyProcessor, times(1)).process(any(AdminResponse.class), 
eq(false));
-  }
-
-  @Test
-  public void sendShouldFlushDiskStores() throws Exception {
-    flushToDiskRequest.send();
 
-    verify(diskStore1, times(1)).flush();
-    verify(diskStore2, times(1)).flush();
-  }
-
-  @Test
-  public void sendShouldFlushDiskStoresInLocalMemberBeforeWaitingForReplies() 
throws Exception {
-    InOrder inOrder = inOrder(diskStore1, diskStore2, replyProcessor);
-
-    flushToDiskRequest.send();
-
-    // assert that prepareForBackup is invoked before invoking waitForReplies
-    inOrder.verify(diskStore1, times(1)).flush();
-    inOrder.verify(diskStore2, times(1)).flush();
-    inOrder.verify(replyProcessor, times(1)).waitForReplies();
-  }
-
-  @Test
-  public void repliesWithFinishBackupResponse() throws Exception {
-    flushToDiskRequest.send();
+    when(dm.getCache()).thenReturn(cache);
+    when(dm.getDistributionManagerId()).thenReturn(sender);
+    
when(flushToDiskFactory.createFlushToDisk(eq(cache))).thenReturn(flushToDisk);
+    
when(flushToDiskFactory.createResponse(eq(sender))).thenReturn(mock(FlushToDiskResponse.class));
 
-    verify(replyProcessor, times(1)).process(any(FlushToDiskResponse.class), 
eq(false));
+    flushToDiskRequest = new FlushToDiskRequest(sender, recipients, msgId, 
flushToDiskFactory);
   }
 
   @Test
-  public void 
sendShouldCompleteIfWaitForRepliesThrowsReplyExceptionCausedByCacheClosedException()
-      throws Exception {
-    doThrow(new ReplyException(new 
CacheClosedException())).when(replyProcessor).waitForReplies();
+  public void usesFactoryToCreateFlushToDisk() throws Exception {
+    flushToDiskRequest.createResponse(dm);
 
-    flushToDiskRequest.send();
+    verify(flushToDiskFactory, times(1)).createFlushToDisk(eq(cache));
   }
 
   @Test
-  public void 
sendShouldThrowIfWaitForRepliesThrowsReplyExceptionNotCausedByCancelException()
-      throws Exception {
-    doThrow(new ReplyException(new 
NullPointerException())).when(replyProcessor).waitForReplies();
+  public void usesFactoryToCreateResponse() throws Exception {
+    flushToDiskRequest.createResponse(dm);
 
-    assertThatThrownBy(() -> 
flushToDiskRequest.send()).isInstanceOf(ReplyException.class)
-        .hasCauseInstanceOf(NullPointerException.class);
+    verify(flushToDiskFactory, times(1)).createResponse(eq(sender));
   }
 
   @Test
-  public void sendCompletesWhenWaitForRepliesThrowsInterruptedException() 
throws Exception {
-    doThrow(new InterruptedException()).when(replyProcessor).waitForReplies();
-
-    flushToDiskRequest.send();
+  public void returnsFlushToDiskResponse() throws Exception {
+    
assertThat(flushToDiskRequest.createResponse(dm)).isInstanceOf(FlushToDiskResponse.class);
   }
-
 }
diff --git 
a/geode-core/src/test/java/org/apache/geode/internal/cache/backup/PrepareBackupRequestTest.java
 
b/geode-core/src/test/java/org/apache/geode/internal/cache/backup/PrepareBackupRequestTest.java
index 706ef27..f1700bc 100644
--- 
a/geode-core/src/test/java/org/apache/geode/internal/cache/backup/PrepareBackupRequestTest.java
+++ 
b/geode-core/src/test/java/org/apache/geode/internal/cache/backup/PrepareBackupRequestTest.java
@@ -18,7 +18,6 @@ import static org.assertj.core.api.Assertions.*;
 import static org.mockito.Mockito.*;
 
 import java.io.IOException;
-import java.util.Collections;
 import java.util.HashSet;
 import java.util.Set;
 

-- 
To stop receiving notification emails like this one, please contact
['"[email protected]" <[email protected]>'].

Reply via email to