This is an automated email from the ASF dual-hosted git repository.
dschneider pushed a commit to branch feature/GEODE-3940
in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/feature/GEODE-3940 by this
push:
new 96e327a Added FlushToDiskOperation
96e327a is described below
commit 96e327ac297914be6f5db1ecc988c51cb8a18399
Author: Darrel Schneider <[email protected]>
AuthorDate: Mon Nov 13 15:42:02 2017 -0800
Added FlushToDiskOperation
---
.../cache/backup/BackupDataStoreHelper.java | 3 +-
.../geode/internal/cache/backup/FlushToDisk.java | 33 +++++
.../internal/cache/backup/FlushToDiskFactory.java | 44 +++++++
...oDiskRequest.java => FlushToDiskOperation.java} | 70 ++++------
.../internal/cache/backup/FlushToDiskRequest.java | 66 ++--------
.../cache/backup/FlushToDiskFactoryTest.java | 83 ++++++++++++
.../cache/backup/FlushToDiskOperationTest.java | 121 +++++++++++++++++
.../cache/backup/FlushToDiskRequestTest.java | 146 ++++-----------------
.../cache/backup/PrepareBackupRequestTest.java | 1 -
9 files changed, 349 insertions(+), 218 deletions(-)
diff --git
a/geode-core/src/main/java/org/apache/geode/internal/cache/backup/BackupDataStoreHelper.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/backup/BackupDataStoreHelper.java
index 455111d..d5d5386 100644
---
a/geode-core/src/main/java/org/apache/geode/internal/cache/backup/BackupDataStoreHelper.java
+++
b/geode-core/src/main/java/org/apache/geode/internal/cache/backup/BackupDataStoreHelper.java
@@ -29,13 +29,14 @@ public class BackupDataStoreHelper {
public static final String LOCK_SERVICE_NAME =
BackupDataStoreHelper.class.getSimpleName();
private static final String LOCK_NAME = LOCK_SERVICE_NAME + "_token";
private static final Object LOCK_SYNC = new Object();
+ private static final FlushToDiskFactory flushToDiskFactory = new
FlushToDiskFactory();
private static final PrepareBackupFactory prepareBackupFactory = new
PrepareBackupFactory();
private static final FinishBackupFactory finishBackupFactory = new
FinishBackupFactory();
@SuppressWarnings("rawtypes")
public static BackupDataStoreResult backupAllMembers(DM dm, Set recipients,
File targetDir,
File baselineDir) {
- FlushToDiskRequest.send(dm, recipients);
+ new FlushToDiskOperation(dm, dm.getId(), dm.getCache(), recipients,
flushToDiskFactory).send();
boolean abort = true;
Map<DistributedMember, Set<PersistentID>> successfulMembers;
diff --git
a/geode-core/src/main/java/org/apache/geode/internal/cache/backup/FlushToDisk.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/backup/FlushToDisk.java
new file mode 100644
index 0000000..8149d34
--- /dev/null
+++
b/geode-core/src/main/java/org/apache/geode/internal/cache/backup/FlushToDisk.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express
+ * or implied. See the License for the specific language governing permissions
and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache.backup;
+
+import org.apache.geode.cache.DiskStore;
+import org.apache.geode.internal.cache.InternalCache;
+
+class FlushToDisk {
+
+ private final InternalCache cache;
+
+ FlushToDisk(InternalCache cache) {
+ this.cache = cache;
+ }
+
+ void run() {
+ if (cache != null) {
+ cache.listDiskStoresIncludingRegionOwned().forEach(DiskStore::flush);
+ }
+ }
+}
diff --git
a/geode-core/src/main/java/org/apache/geode/internal/cache/backup/FlushToDiskFactory.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/backup/FlushToDiskFactory.java
new file mode 100644
index 0000000..9d91997
--- /dev/null
+++
b/geode-core/src/main/java/org/apache/geode/internal/cache/backup/FlushToDiskFactory.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express
+ * or implied. See the License for the specific language governing permissions
and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache.backup;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.geode.cache.persistence.PersistentID;
+import org.apache.geode.distributed.internal.DM;
+import
org.apache.geode.distributed.internal.membership.InternalDistributedMember;
+import org.apache.geode.internal.cache.InternalCache;
+
+class FlushToDiskFactory {
+
+ FlushToDiskProcessor createReplyProcessor(DM dm,
Set<InternalDistributedMember> recipients) {
+ return new FlushToDiskProcessor(dm, recipients);
+ }
+
+ FlushToDiskRequest createRequest(InternalDistributedMember sender,
+ Set<InternalDistributedMember> recipients, int processorId) {
+ return new FlushToDiskRequest(sender, recipients, processorId, this);
+ }
+
+ FlushToDisk createFlushToDisk(InternalCache cache) {
+ return new FlushToDisk(cache);
+ }
+
+ FlushToDiskResponse createResponse(InternalDistributedMember sender) {
+ return new FlushToDiskResponse(sender);
+ }
+
+}
diff --git
a/geode-core/src/main/java/org/apache/geode/internal/cache/backup/FlushToDiskRequest.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/backup/FlushToDiskOperation.java
similarity index 55%
copy from
geode-core/src/main/java/org/apache/geode/internal/cache/backup/FlushToDiskRequest.java
copy to
geode-core/src/main/java/org/apache/geode/internal/cache/backup/FlushToDiskOperation.java
index 8910527..afeda08 100644
---
a/geode-core/src/main/java/org/apache/geode/internal/cache/backup/FlushToDiskRequest.java
+++
b/geode-core/src/main/java/org/apache/geode/internal/cache/backup/FlushToDiskOperation.java
@@ -21,7 +21,9 @@ import org.apache.logging.log4j.Logger;
import org.apache.geode.CancelException;
import org.apache.geode.cache.DiskStore;
import org.apache.geode.distributed.internal.DM;
+import org.apache.geode.distributed.internal.DistributionMessage;
import org.apache.geode.distributed.internal.ReplyException;
+import org.apache.geode.distributed.internal.ReplyProcessor21;
import
org.apache.geode.distributed.internal.membership.InternalDistributedMember;
import org.apache.geode.internal.admin.remote.AdminResponse;
import org.apache.geode.internal.admin.remote.CliLegacyMessage;
@@ -29,43 +31,34 @@ import org.apache.geode.internal.cache.InternalCache;
import org.apache.geode.internal.logging.LogService;
/**
- * A request to from an admin VM to all non admin members to start a backup.
In the prepare phase of
- * the backup, the members will suspend bucket destroys to make sure buckets
aren't missed during
+ * A Operation to from an admin VM to all non admin members to start a backup.
In the prepare phase
+ * of the backup, the members will suspend bucket destroys to make sure
buckets aren't missed during
* the backup.
*/
-public class FlushToDiskRequest extends CliLegacyMessage {
+public class FlushToDiskOperation {
private static final Logger logger = LogService.getLogger();
private final DM dm;
- private final FlushToDiskProcessor replyProcessor;
-
- public FlushToDiskRequest() {
- super();
- this.dm = null;
- this.replyProcessor = null;
- }
-
- private FlushToDiskRequest(DM dm, Set<InternalDistributedMember> recipients)
{
- this(dm, recipients, new FlushToDiskProcessor(dm, recipients));
- }
-
- FlushToDiskRequest(DM dm, Set<InternalDistributedMember> recipients,
- FlushToDiskProcessor replyProcessor) {
+ private final InternalDistributedMember member;
+ private final InternalCache cache;
+ private final Set<InternalDistributedMember> recipients;
+ private final FlushToDiskFactory flushToDiskFactory;
+
+ FlushToDiskOperation(DM dm, InternalDistributedMember member, InternalCache
cache,
+ Set<InternalDistributedMember> recipients, FlushToDiskFactory
flushToDiskFactory) {
+ this.flushToDiskFactory = flushToDiskFactory;
this.dm = dm;
- setRecipients(recipients);
- this.replyProcessor = replyProcessor;
- this.msgId = this.replyProcessor.getProcessorId();
- }
-
- public static void send(DM dm, Set recipients) {
- FlushToDiskRequest request = new FlushToDiskRequest(dm, recipients);
- request.send();
+ this.member = member;
+ this.recipients = recipients;
+ this.cache = cache;
}
void send() {
- dm.putOutgoing(this);
+ ReplyProcessor21 replyProcessor = createReplyProcessor();
+
+ dm.putOutgoing(createDistributionMessage(replyProcessor));
- AdminResponse response = createResponse(dm);
+ processLocally();
try {
replyProcessor.waitForReplies();
@@ -74,26 +67,21 @@ public class FlushToDiskRequest extends CliLegacyMessage {
throw e;
}
} catch (InterruptedException e) {
- logger.warn(e);
+ logger.warn(e.getMessage(), e);
}
-
- response.setSender(dm.getDistributionManagerId());
- replyProcessor.process(response, false);
}
- @Override
- protected AdminResponse createResponse(DM dm) {
- InternalCache cache = dm.getCache();
- if (cache != null) {
- cache.listDiskStoresIncludingRegionOwned().forEach(DiskStore::flush);
- }
+ private ReplyProcessor21 createReplyProcessor() {
+ return this.flushToDiskFactory.createReplyProcessor(dm, recipients);
+ }
- return new FlushToDiskResponse(getSender());
+ private DistributionMessage createDistributionMessage(ReplyProcessor21
replyProcessor) {
+ return this.flushToDiskFactory.createRequest(member, recipients,
+ replyProcessor.getProcessorId());
}
- @Override
- public int getDSFID() {
- return FLUSH_TO_DISK_REQUEST;
+ private void processLocally() {
+ flushToDiskFactory.createFlushToDisk(cache).run();
}
}
diff --git
a/geode-core/src/main/java/org/apache/geode/internal/cache/backup/FlushToDiskRequest.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/backup/FlushToDiskRequest.java
index 8910527..f421447 100644
---
a/geode-core/src/main/java/org/apache/geode/internal/cache/backup/FlushToDiskRequest.java
+++
b/geode-core/src/main/java/org/apache/geode/internal/cache/backup/FlushToDiskRequest.java
@@ -16,17 +16,10 @@ package org.apache.geode.internal.cache.backup;
import java.util.Set;
-import org.apache.logging.log4j.Logger;
-
-import org.apache.geode.CancelException;
-import org.apache.geode.cache.DiskStore;
import org.apache.geode.distributed.internal.DM;
-import org.apache.geode.distributed.internal.ReplyException;
import
org.apache.geode.distributed.internal.membership.InternalDistributedMember;
import org.apache.geode.internal.admin.remote.AdminResponse;
import org.apache.geode.internal.admin.remote.CliLegacyMessage;
-import org.apache.geode.internal.cache.InternalCache;
-import org.apache.geode.internal.logging.LogService;
/**
* A request to from an admin VM to all non admin members to start a backup.
In the prepare phase of
@@ -34,66 +27,31 @@ import org.apache.geode.internal.logging.LogService;
* the backup.
*/
public class FlushToDiskRequest extends CliLegacyMessage {
- private static final Logger logger = LogService.getLogger();
- private final DM dm;
- private final FlushToDiskProcessor replyProcessor;
+ private final FlushToDiskFactory flushToDiskFactory;
public FlushToDiskRequest() {
super();
- this.dm = null;
- this.replyProcessor = null;
- }
-
- private FlushToDiskRequest(DM dm, Set<InternalDistributedMember> recipients)
{
- this(dm, recipients, new FlushToDiskProcessor(dm, recipients));
+ this.flushToDiskFactory = new FlushToDiskFactory();
}
- FlushToDiskRequest(DM dm, Set<InternalDistributedMember> recipients,
- FlushToDiskProcessor replyProcessor) {
- this.dm = dm;
+ FlushToDiskRequest(InternalDistributedMember sender,
Set<InternalDistributedMember> recipients,
+ int processorId, FlushToDiskFactory flushToDiskFactory) {
+ this.setSender(sender);
setRecipients(recipients);
- this.replyProcessor = replyProcessor;
- this.msgId = this.replyProcessor.getProcessorId();
- }
-
- public static void send(DM dm, Set recipients) {
- FlushToDiskRequest request = new FlushToDiskRequest(dm, recipients);
- request.send();
- }
-
- void send() {
- dm.putOutgoing(this);
-
- AdminResponse response = createResponse(dm);
-
- try {
- replyProcessor.waitForReplies();
- } catch (ReplyException e) {
- if (!(e.getCause() instanceof CancelException)) {
- throw e;
- }
- } catch (InterruptedException e) {
- logger.warn(e);
- }
-
- response.setSender(dm.getDistributionManagerId());
- replyProcessor.process(response, false);
+ this.msgId = processorId;
+ this.flushToDiskFactory = flushToDiskFactory;
}
@Override
- protected AdminResponse createResponse(DM dm) {
- InternalCache cache = dm.getCache();
- if (cache != null) {
- cache.listDiskStoresIncludingRegionOwned().forEach(DiskStore::flush);
- }
-
- return new FlushToDiskResponse(getSender());
+ public int getDSFID() {
+ return FLUSH_TO_DISK_REQUEST;
}
@Override
- public int getDSFID() {
- return FLUSH_TO_DISK_REQUEST;
+ protected AdminResponse createResponse(DM dm) {
+ flushToDiskFactory.createFlushToDisk(dm.getCache()).run();
+ return flushToDiskFactory.createResponse(getSender());
}
}
diff --git
a/geode-core/src/test/java/org/apache/geode/internal/cache/backup/FlushToDiskFactoryTest.java
b/geode-core/src/test/java/org/apache/geode/internal/cache/backup/FlushToDiskFactoryTest.java
new file mode 100644
index 0000000..cfb62ab
--- /dev/null
+++
b/geode-core/src/test/java/org/apache/geode/internal/cache/backup/FlushToDiskFactoryTest.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express
+ * or implied. See the License for the specific language governing permissions
and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache.backup;
+
+import static org.assertj.core.api.Assertions.*;
+import static org.mockito.Mockito.*;
+
+import java.io.File;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import org.apache.geode.CancelCriterion;
+import org.apache.geode.cache.persistence.PersistentID;
+import org.apache.geode.distributed.internal.DM;
+import org.apache.geode.distributed.internal.InternalDistributedSystem;
+import
org.apache.geode.distributed.internal.membership.InternalDistributedMember;
+import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.test.junit.categories.UnitTest;
+
+@Category(UnitTest.class)
+public class FlushToDiskFactoryTest {
+
+ private FlushToDiskFactory flushToDiskFactory;
+
+ private DM dm;
+ private InternalDistributedMember sender;
+ private Set<InternalDistributedMember> recipients;
+ private InternalDistributedMember member;
+ private InternalCache cache;
+
+ @Before
+ public void setUp() throws Exception {
+ dm = mock(DM.class);
+ sender = mock(InternalDistributedMember.class);
+ member = mock(InternalDistributedMember.class);
+ cache = mock(InternalCache.class);
+
+ recipients = new HashSet<>();
+
+ when(dm.getSystem()).thenReturn(mock(InternalDistributedSystem.class));
+ when(dm.getCancelCriterion()).thenReturn(mock(CancelCriterion.class));
+
+ flushToDiskFactory = new FlushToDiskFactory();
+ }
+
+ @Test
+ public void createReplyProcessorReturnsFlushToDiskReplyProcessor() throws
Exception {
+ assertThat(flushToDiskFactory.createReplyProcessor(dm, recipients))
+ .isInstanceOf(FlushToDiskProcessor.class);
+ }
+
+ @Test
+ public void createRequestReturnsFlushToDiskRequest() throws Exception {
+ assertThat(flushToDiskFactory.createRequest(sender, recipients, 1))
+ .isInstanceOf(FlushToDiskRequest.class);
+ }
+
+ @Test
+ public void createFlushToDiskReturnsFlushToDisk() throws Exception {
+
assertThat(flushToDiskFactory.createFlushToDisk(cache)).isInstanceOf(FlushToDisk.class);
+ }
+
+ @Test
+ public void createResponseReturnsFlushToDiskResponse() {
+
assertThat(flushToDiskFactory.createResponse(member)).isInstanceOf(FlushToDiskResponse.class);
+ }
+}
diff --git
a/geode-core/src/test/java/org/apache/geode/internal/cache/backup/FlushToDiskOperationTest.java
b/geode-core/src/test/java/org/apache/geode/internal/cache/backup/FlushToDiskOperationTest.java
new file mode 100644
index 0000000..a79b43f
--- /dev/null
+++
b/geode-core/src/test/java/org/apache/geode/internal/cache/backup/FlushToDiskOperationTest.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express
+ * or implied. See the License for the specific language governing permissions
and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache.backup;
+
+import static org.mockito.Mockito.*;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.mockito.InOrder;
+
+import org.apache.geode.cache.CacheClosedException;
+import org.apache.geode.distributed.internal.DM;
+import org.apache.geode.distributed.internal.ReplyException;
+import
org.apache.geode.distributed.internal.membership.InternalDistributedMember;
+import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.test.junit.categories.UnitTest;
+
+@Category(UnitTest.class)
+public class FlushToDiskOperationTest {
+
+ private DM dm;
+ private InternalCache cache;
+ private Set<InternalDistributedMember> recipients;
+
+ private InternalDistributedMember sender;
+
+ private FlushToDiskFactory flushToDiskFactory;
+ private FlushToDiskProcessor flushToDiskReplyProcessor;
+ private FlushToDiskRequest flushToDiskRequest;
+ private FlushToDisk flushToDisk;
+
+ private FlushToDiskOperation flushToDiskOperation;
+
+ @Before
+ public void setUp() throws Exception {
+ dm = mock(DM.class);
+ cache = mock(InternalCache.class);
+
+ flushToDiskReplyProcessor = mock(FlushToDiskProcessor.class);
+ flushToDiskRequest = mock(FlushToDiskRequest.class);
+ flushToDisk = mock(FlushToDisk.class);
+
+ flushToDiskFactory = mock(FlushToDiskFactory.class);
+
+ sender = mock(InternalDistributedMember.class, "sender");
+ recipients = new HashSet<>();
+
+ flushToDiskOperation =
+ new FlushToDiskOperation(dm, sender, cache, recipients,
flushToDiskFactory);
+
+ when(flushToDiskReplyProcessor.getProcessorId()).thenReturn(42);
+
+ when(flushToDiskFactory.createReplyProcessor(eq(dm), eq(recipients)))
+ .thenReturn(flushToDiskReplyProcessor);
+ when(flushToDiskFactory.createRequest(eq(sender), eq(recipients), eq(42)))
+ .thenReturn(flushToDiskRequest);
+
when(flushToDiskFactory.createFlushToDisk(eq(cache))).thenReturn(flushToDisk);
+ }
+
+ @Test
+ public void sendShouldSendFlushToDiskMessage() throws Exception {
+ flushToDiskOperation.send();
+
+ verify(dm, times(1)).putOutgoing(flushToDiskRequest);
+ }
+
+ @Test
+ public void sendShouldHandleCancelExceptionFromWaitForReplies() throws
Exception {
+ ReplyException replyException =
+ new ReplyException("expected exception", new
CacheClosedException("expected exception"));
+ doThrow(replyException).when(flushToDiskReplyProcessor).waitForReplies();
+ flushToDiskOperation.send();
+ }
+
+ @Test
+ public void sendShouldHandleInterruptedExceptionFromWaitForReplies() throws
Exception {
+ doThrow(new InterruptedException("expected
exception")).when(flushToDiskReplyProcessor)
+ .waitForReplies();
+ flushToDiskOperation.send();
+ }
+
+ @Test(expected = ReplyException.class)
+ public void sendShouldThrowReplyExceptionWithNoCauseFromWaitForReplies()
throws Exception {
+ doThrow(new ReplyException("expected
exception")).when(flushToDiskReplyProcessor)
+ .waitForReplies();
+ flushToDiskOperation.send();
+ }
+
+ @Test(expected = ReplyException.class)
+ public void
sendShouldThrowReplyExceptionWithCauseThatIsNotACancelFromWaitForReplies()
+ throws Exception {
+ doThrow(new ReplyException("expected exception", new
RuntimeException("expected")))
+ .when(flushToDiskReplyProcessor).waitForReplies();
+ flushToDiskOperation.send();
+ }
+
+ @Test
+ public void sendShouldProcessLocallyBeforeWaitingForReplies() throws
Exception {
+ InOrder inOrder = inOrder(flushToDisk, flushToDiskReplyProcessor);
+ flushToDiskOperation.send();
+
+ inOrder.verify(flushToDisk, times(1)).run();
+ inOrder.verify(flushToDiskReplyProcessor, times(1)).waitForReplies();
+ }
+}
diff --git
a/geode-core/src/test/java/org/apache/geode/internal/cache/backup/FlushToDiskRequestTest.java
b/geode-core/src/test/java/org/apache/geode/internal/cache/backup/FlushToDiskRequestTest.java
index f1ea67d..113bb70 100644
---
a/geode-core/src/test/java/org/apache/geode/internal/cache/backup/FlushToDiskRequestTest.java
+++
b/geode-core/src/test/java/org/apache/geode/internal/cache/backup/FlushToDiskRequestTest.java
@@ -14,33 +14,18 @@
*/
package org.apache.geode.internal.cache.backup;
-import static org.assertj.core.api.Assertions.assertThat;
-import static org.assertj.core.api.Assertions.assertThatThrownBy;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.eq;
-import static org.mockito.Mockito.doThrow;
-import static org.mockito.Mockito.inOrder;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-import java.util.ArrayList;
-import java.util.Collection;
+import static org.assertj.core.api.Assertions.*;
+import static org.mockito.Mockito.*;
+
import java.util.HashSet;
import java.util.Set;
import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;
-import org.mockito.InOrder;
-import org.apache.geode.cache.CacheClosedException;
-import org.apache.geode.cache.DiskStore;
import org.apache.geode.distributed.internal.DM;
-import org.apache.geode.distributed.internal.ReplyException;
import
org.apache.geode.distributed.internal.membership.InternalDistributedMember;
-import org.apache.geode.internal.admin.remote.AdminResponse;
import org.apache.geode.internal.cache.InternalCache;
import org.apache.geode.test.junit.categories.UnitTest;
@@ -49,130 +34,49 @@ public class FlushToDiskRequestTest {
private FlushToDiskRequest flushToDiskRequest;
- private FlushToDiskProcessor replyProcessor;
private DM dm;
- private InternalCache cache;
-
- private DiskStore diskStore1;
- private DiskStore diskStore2;
- private Collection<DiskStore> diskStoreCollection;
-
- private InternalDistributedMember localMember;
- private InternalDistributedMember member1;
- private InternalDistributedMember member2;
-
private Set<InternalDistributedMember> recipients;
+ private int msgId;
+ private FlushToDiskFactory flushToDiskFactory;
+ private InternalDistributedMember sender;
+ private InternalCache cache;
+ private FlushToDisk flushToDisk;
@Before
public void setUp() throws Exception {
- // mocks here
- replyProcessor = mock(FlushToDiskProcessor.class);
dm = mock(DM.class);
+ sender = mock(InternalDistributedMember.class);
cache = mock(InternalCache.class);
- diskStore1 = mock(DiskStore.class);
- diskStore2 = mock(DiskStore.class);
-
- diskStoreCollection = new ArrayList<>();
- diskStoreCollection.add(diskStore1);
- diskStoreCollection.add(diskStore2);
-
- when(dm.getCache()).thenReturn(cache);
- when(dm.getDistributionManagerId()).thenReturn(localMember);
-
when(cache.listDiskStoresIncludingRegionOwned()).thenReturn(diskStoreCollection);
-
- localMember = mock(InternalDistributedMember.class);
- member1 = mock(InternalDistributedMember.class);
- member2 = mock(InternalDistributedMember.class);
+ flushToDiskFactory = mock(FlushToDiskFactory.class);
+ flushToDisk = mock(FlushToDisk.class);
+ msgId = 42;
recipients = new HashSet<>();
- recipients.add(member1);
- recipients.add(member2);
-
- flushToDiskRequest = new FlushToDiskRequest(dm, recipients,
replyProcessor);
- }
-
- @Test
- public void getRecipientsReturnsRecipientMembers() throws Exception {
-
assertThat(flushToDiskRequest.getRecipients()).hasSize(2).contains(member1,
member2);
- }
-
- @Test
- public void getRecipientsDoesNotIncludeNull() throws Exception {
- InternalDistributedMember nullMember = null;
-
- assertThat(flushToDiskRequest.getRecipients()).doesNotContain(nullMember);
- }
-
- @Test
- public void sendShouldUseDMToSendMessage() throws Exception {
- flushToDiskRequest.send();
-
- verify(dm, times(1)).putOutgoing(flushToDiskRequest);
- }
-
- @Test
- public void sendShouldWaitForRepliesFromRecipients() throws Exception {
- flushToDiskRequest.send();
-
- verify(replyProcessor, times(1)).waitForReplies();
- }
-
- @Test
- public void sendShouldInvokeProcessLocally() throws Exception {
- flushToDiskRequest.send();
-
- verify(replyProcessor, times(1)).process(any(AdminResponse.class),
eq(false));
- }
-
- @Test
- public void sendShouldFlushDiskStores() throws Exception {
- flushToDiskRequest.send();
- verify(diskStore1, times(1)).flush();
- verify(diskStore2, times(1)).flush();
- }
-
- @Test
- public void sendShouldFlushDiskStoresInLocalMemberBeforeWaitingForReplies()
throws Exception {
- InOrder inOrder = inOrder(diskStore1, diskStore2, replyProcessor);
-
- flushToDiskRequest.send();
-
- // assert that prepareForBackup is invoked before invoking waitForReplies
- inOrder.verify(diskStore1, times(1)).flush();
- inOrder.verify(diskStore2, times(1)).flush();
- inOrder.verify(replyProcessor, times(1)).waitForReplies();
- }
-
- @Test
- public void repliesWithFinishBackupResponse() throws Exception {
- flushToDiskRequest.send();
+ when(dm.getCache()).thenReturn(cache);
+ when(dm.getDistributionManagerId()).thenReturn(sender);
+
when(flushToDiskFactory.createFlushToDisk(eq(cache))).thenReturn(flushToDisk);
+
when(flushToDiskFactory.createResponse(eq(sender))).thenReturn(mock(FlushToDiskResponse.class));
- verify(replyProcessor, times(1)).process(any(FlushToDiskResponse.class),
eq(false));
+ flushToDiskRequest = new FlushToDiskRequest(sender, recipients, msgId,
flushToDiskFactory);
}
@Test
- public void
sendShouldCompleteIfWaitForRepliesThrowsReplyExceptionCausedByCacheClosedException()
- throws Exception {
- doThrow(new ReplyException(new
CacheClosedException())).when(replyProcessor).waitForReplies();
+ public void usesFactoryToCreateFlushToDisk() throws Exception {
+ flushToDiskRequest.createResponse(dm);
- flushToDiskRequest.send();
+ verify(flushToDiskFactory, times(1)).createFlushToDisk(eq(cache));
}
@Test
- public void
sendShouldThrowIfWaitForRepliesThrowsReplyExceptionNotCausedByCancelException()
- throws Exception {
- doThrow(new ReplyException(new
NullPointerException())).when(replyProcessor).waitForReplies();
+ public void usesFactoryToCreateResponse() throws Exception {
+ flushToDiskRequest.createResponse(dm);
- assertThatThrownBy(() ->
flushToDiskRequest.send()).isInstanceOf(ReplyException.class)
- .hasCauseInstanceOf(NullPointerException.class);
+ verify(flushToDiskFactory, times(1)).createResponse(eq(sender));
}
@Test
- public void sendCompletesWhenWaitForRepliesThrowsInterruptedException()
throws Exception {
- doThrow(new InterruptedException()).when(replyProcessor).waitForReplies();
-
- flushToDiskRequest.send();
+ public void returnsFlushToDiskResponse() throws Exception {
+
assertThat(flushToDiskRequest.createResponse(dm)).isInstanceOf(FlushToDiskResponse.class);
}
-
}
diff --git
a/geode-core/src/test/java/org/apache/geode/internal/cache/backup/PrepareBackupRequestTest.java
b/geode-core/src/test/java/org/apache/geode/internal/cache/backup/PrepareBackupRequestTest.java
index 706ef27..f1700bc 100644
---
a/geode-core/src/test/java/org/apache/geode/internal/cache/backup/PrepareBackupRequestTest.java
+++
b/geode-core/src/test/java/org/apache/geode/internal/cache/backup/PrepareBackupRequestTest.java
@@ -18,7 +18,6 @@ import static org.assertj.core.api.Assertions.*;
import static org.mockito.Mockito.*;
import java.io.IOException;
-import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
--
To stop receiving notification emails like this one, please contact
['"[email protected]" <[email protected]>'].