This is an automated email from the ASF dual-hosted git repository.

klund pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/develop by this push:
     new 483efb0  GEODE-6295: Use InternalCacheBuilder for constructing 
GemFireCacheImpl (#3260)
483efb0 is described below

commit 483efb041c6ecbe13f3643ed7602ee69720f1645
Author: Kirk Lund <[email protected]>
AuthorDate: Tue Mar 5 11:06:27 2019 -0800

    GEODE-6295: Use InternalCacheBuilder for constructing GemFireCacheImpl 
(#3260)
    
    All code (product and tests) now use InternalCacheBuilder to construct
    instances of GemFireCacheImpl.
---
 .../java/org/apache/geode/cache/CacheFactory.java  | 312 ++++-------
 .../geode/cache/client/ClientCacheFactory.java     |   7 +-
 .../distributed/internal/ConnectionConfig.java     |  37 ++
 .../distributed/internal/ConnectionConfigImpl.java |  75 +++
 .../internal/InternalDistributedSystem.java        | 130 +++--
 .../apache/geode/internal/cache/CacheConfig.java   |   4 +-
 .../geode/internal/cache/CacheFactoryStatics.java  | 123 ++++
 .../geode/internal/cache/GemFireCacheImpl.java     |  93 +---
 .../apache/geode/internal/cache/InternalCache.java |   4 +
 .../geode/internal/cache/InternalCacheBuilder.java | 394 +++++++++++++
 .../cache/InternalCacheForClientAccess.java        |  10 +
 .../internal/cache/xmlcache/CacheCreation.java     |  10 +
 .../internal/ConnectionConfigImplTest.java         | 181 ++++++
 .../internal/InternalDistributedSystemTest.java    |  22 +-
 .../geode/internal/cache/GemFireCacheImplTest.java | 304 +++++-----
 ...ernalCacheBuilderAllowsMultipleSystemsTest.java | 468 ++++++++++++++++
 .../internal/cache/InternalCacheBuilderTest.java   | 618 +++++++++++++++++++++
 .../geode/internal/cache/wan/WANTestBase.java      |  23 +-
 18 files changed, 2315 insertions(+), 500 deletions(-)

diff --git a/geode-core/src/main/java/org/apache/geode/cache/CacheFactory.java 
b/geode-core/src/main/java/org/apache/geode/cache/CacheFactory.java
index e824769..0d37e71 100644
--- a/geode-core/src/main/java/org/apache/geode/cache/CacheFactory.java
+++ b/geode-core/src/main/java/org/apache/geode/cache/CacheFactory.java
@@ -14,32 +14,24 @@
  */
 package org.apache.geode.cache;
 
-import static 
org.apache.geode.distributed.internal.InternalDistributedSystem.ALLOW_MULTIPLE_SYSTEMS;
-
 import java.util.Properties;
 
 import org.apache.geode.distributed.ConfigurationProperties;
 import org.apache.geode.distributed.DistributedSystem;
-import org.apache.geode.distributed.internal.InternalDistributedSystem;
-import org.apache.geode.distributed.internal.SecurityConfig;
 import org.apache.geode.internal.GemFireVersion;
-import org.apache.geode.internal.cache.CacheConfig;
-import org.apache.geode.internal.cache.GemFireCacheImpl;
-import org.apache.geode.internal.cache.InternalCache;
-import org.apache.geode.internal.cache.LocalRegion;
+import org.apache.geode.internal.cache.CacheFactoryStatics;
+import org.apache.geode.internal.cache.InternalCacheBuilder;
 import org.apache.geode.pdx.PdxInstance;
 import org.apache.geode.pdx.PdxSerializer;
 import org.apache.geode.security.AuthenticationFailedException;
 import org.apache.geode.security.AuthenticationRequiredException;
-import org.apache.geode.security.GemFireSecurityException;
 import org.apache.geode.security.PostProcessor;
 import org.apache.geode.security.SecurityManager;
 
 /**
  * Factory class used to create the singleton {@link Cache cache} and connect 
to the GemFire
  * singleton {@link DistributedSystem distributed system}. If the application 
wants to connect to
- * GemFire as a client it should use {@link 
org.apache.geode.cache.client.ClientCacheFactory}
- * instead.
+ * GemFire as a client it should use {@link ClientCacheFactory} instead.
  * <p>
  * Once the factory has been configured using its {@link #set(String, String)} 
method you produce a
  * {@link Cache} by calling the {@link #create()} method.
@@ -94,9 +86,7 @@ import org.apache.geode.security.SecurityManager;
  */
 public class CacheFactory {
 
-  private final Properties dsProps;
-
-  private final CacheConfig cacheConfig = new CacheConfig();
+  private final InternalCacheBuilder internalCacheBuilder;
 
   /**
    * Creates a default cache factory.
@@ -115,65 +105,7 @@ public class CacheFactory {
    * @since GemFire 6.5
    */
   public CacheFactory(Properties props) {
-    if (props == null) {
-      props = new Properties();
-    }
-    this.dsProps = props;
-  }
-
-  /**
-   * Sets a gemfire property that will be used when creating the Cache. For a 
list of valid GemFire
-   * properties and their meanings see {@link ConfigurationProperties}.
-   *
-   * @param name the name of the gemfire property
-   * @param value the value of the gemfire property
-   * @return a reference to this CacheFactory object
-   * @since GemFire 6.5
-   */
-  public CacheFactory set(String name, String value) {
-    this.dsProps.setProperty(name, value);
-    return this;
-  }
-
-  /**
-   * Creates a new cache that uses the specified {@code system}.
-   * <p>
-   * The {@code system} can specify a
-   * <A 
href="../distributed/DistributedSystem.html#cache-xml-file">"cache-xml-file"</a>
 property
-   * which will cause this creation to also create the regions, objects, and 
attributes declared in
-   * the file. The contents of the file must comply with the {@code 
"doc-files/cache8_0.dtd">} file.
-   * Note that when parsing the XML file {@link Declarable} classes are loaded 
using the current
-   * thread's {@linkplain Thread#getContextClassLoader context class loader}.
-   *
-   * @param system a {@code DistributedSystem} obtained by calling
-   *        {@link DistributedSystem#connect}.
-   *
-   * @return a {@code Cache} that uses the specified {@code system} for 
distribution.
-   *
-   * @throws IllegalArgumentException If {@code system} is not {@link 
DistributedSystem#isConnected
-   *         connected}.
-   * @throws CacheExistsException If an open cache already exists.
-   * @throws CacheXmlException If a problem occurs while parsing the 
declarative caching XML file.
-   * @throws TimeoutException If a {@link Region#put(Object, Object)} times 
out while initializing
-   *         the cache.
-   * @throws CacheWriterException If a {@code CacheWriterException} is thrown 
while initializing the
-   *         cache.
-   * @throws GatewayException If a {@code GatewayException} is thrown while 
initializing the cache.
-   * @throws RegionExistsException If the declarative caching XML file 
describes a region that
-   *         already exists (including the root region).
-   * @deprecated as of 6.5 use {@link #CacheFactory(Properties)} instead.
-   */
-  @Deprecated
-  public static synchronized Cache create(DistributedSystem system) throws 
CacheExistsException,
-      TimeoutException, CacheWriterException, GatewayException, 
RegionExistsException {
-    return create(system, false, new CacheConfig());
-  }
-
-  private static synchronized Cache create(DistributedSystem system, boolean 
existingOk,
-      CacheConfig cacheConfig) throws CacheExistsException, TimeoutException, 
CacheWriterException,
-      GatewayException, RegionExistsException {
-    // Moved code in this method to GemFireCacheImpl.create
-    return GemFireCacheImpl.create((InternalDistributedSystem) system, 
existingOk, cacheConfig);
+    internalCacheBuilder = new InternalCacheBuilder(props);
   }
 
   /**
@@ -203,134 +135,21 @@ public class CacheFactory {
    */
   public Cache create()
       throws TimeoutException, CacheWriterException, GatewayException, 
RegionExistsException {
-    synchronized (CacheFactory.class) {
-      DistributedSystem ds = null;
-      if (this.dsProps.isEmpty() && !ALLOW_MULTIPLE_SYSTEMS) {
-        // any ds will do
-        ds = InternalDistributedSystem.getConnectedInstance();
-        validateUsabilityOfSecurityCallbacks(ds);
-      }
-      if (ds == null) {
-        ds = InternalDistributedSystem.connectInternal(dsProps, new 
SecurityConfig(
-            this.cacheConfig.getSecurityManager(),
-            this.cacheConfig.getPostProcessor()));
-      }
-      return create(ds, true, this.cacheConfig);
-    }
-  }
-
-  /**
-   * Throws GemFireSecurityException if existing DistributedSystem connection 
cannot use specified
-   * SecurityManager or PostProcessor.
-   */
-  private void validateUsabilityOfSecurityCallbacks(DistributedSystem ds)
-      throws GemFireSecurityException {
-    if (ds == null) {
-      return;
-    }
-    // pre-existing DistributedSystem already has an incompatible 
SecurityService in use
-    if (this.cacheConfig.getSecurityManager() != null) {
-      // invalid configuration
-      throw new GemFireSecurityException(
-          "Existing DistributedSystem connection cannot use specified 
SecurityManager");
-    }
-    if (this.cacheConfig.getPostProcessor() != null) {
-      // invalid configuration
-      throw new GemFireSecurityException(
-          "Existing DistributedSystem connection cannot use specified 
PostProcessor");
-    }
-  }
-
-  /**
-   * Gets the instance of {@link Cache} produced by an earlier call to {@link 
#create()}.
-   *
-   * @param system the {@code DistributedSystem} the cache was created with.
-   * @return the {@link Cache} associated with the specified system.
-   * @throws CacheClosedException if a cache has not been created or the 
created one is
-   *         {@link Cache#isClosed closed}
-   */
-  public static Cache getInstance(DistributedSystem system) {
-    return basicGetInstance(system, false);
-  }
-
-  /**
-   * Gets the instance of {@link Cache} produced by an earlier call to {@link 
#create()} even if it
-   * has been closed.
-   *
-   * @param system the {@code DistributedSystem} the cache was created with.
-   * @return the {@link Cache} associated with the specified system.
-   * @throws CacheClosedException if a cache has not been created
-   * @since GemFire 3.5
-   */
-  public static Cache getInstanceCloseOk(DistributedSystem system) {
-    return basicGetInstance(system, true);
-  }
-
-  private static Cache basicGetInstance(DistributedSystem system, boolean 
closeOk) {
-    // Avoid synchronization if this is an initialization thread to avoid
-    // deadlock when messaging returns to this VM
-    final int initReq = LocalRegion.threadInitLevelRequirement();
-    if (initReq == LocalRegion.ANY_INIT || initReq == 
LocalRegion.BEFORE_INITIAL_IMAGE) { // fix bug
-                                                                               
           // 33471
-      return basicGetInstancePart2(system, closeOk);
-    } else {
-      synchronized (CacheFactory.class) {
-        return basicGetInstancePart2(system, closeOk);
-      }
-    }
-  }
-
-  private static Cache basicGetInstancePart2(DistributedSystem system, boolean 
closeOk) {
-    InternalCache instance = GemFireCacheImpl.getInstance();
-    if (instance == null) {
-      throw new CacheClosedException(
-          "A cache has not yet been created.");
-    } else {
-      if (instance.isClosed() && !closeOk) {
-        throw instance.getCacheClosedException(
-            "The cache has been closed.", null);
-      }
-      if (!instance.getDistributedSystem().equals(system)) {
-        throw instance.getCacheClosedException(
-            "A cache has not yet been created for the given distributed 
system.");
-      }
-      return instance;
-    }
-  }
-
-  /**
-   * Gets an arbitrary open instance of {@link Cache} produced by an earlier 
call to
-   * {@link #create()}.
-   *
-   * <p>
-   * WARNING: To avoid risk of deadlock, do not invoke getAnyInstance() from 
within any
-   * CacheCallback including CacheListener, CacheLoader, CacheWriter, 
TransactionListener,
-   * TransactionWriter. Instead use EntryEvent.getRegion().getCache(),
-   * RegionEvent.getRegion().getCache(), LoaderHelper.getRegion().getCache(), 
or
-   * TransactionEvent.getCache().
-   * </p>
-   *
-   * @throws CacheClosedException if a cache has not been created or the only 
created one is
-   *         {@link Cache#isClosed closed}
-   */
-  public static synchronized Cache getAnyInstance() {
-    InternalCache instance = GemFireCacheImpl.getInstance();
-    if (instance == null) {
-      throw new CacheClosedException(
-          "A cache has not yet been created.");
-    } else {
-      instance.getCancelCriterion().checkCancelInProgress(null);
-      return instance;
-    }
+    return internalCacheBuilder.create();
   }
 
   /**
-   * Returns the version of the cache implementation.
+   * Sets a gemfire property that will be used when creating the Cache. For a 
list of valid GemFire
+   * properties and their meanings see {@link ConfigurationProperties}.
    *
-   * @return the version of the cache implementation as a {@code String}
+   * @param name the name of the gemfire property
+   * @param value the value of the gemfire property
+   * @return a reference to this CacheFactory object
+   * @since GemFire 6.5
    */
-  public static String getVersion() {
-    return GemFireVersion.getGemFireVersion();
+  public CacheFactory set(String name, String value) {
+    internalCacheBuilder.set(name, value);
+    return this;
   }
 
   /**
@@ -351,7 +170,7 @@ public class CacheFactory {
    * @see org.apache.geode.pdx.PdxInstance
    */
   public CacheFactory setPdxReadSerialized(boolean readSerialized) {
-    this.cacheConfig.setPdxReadSerialized(readSerialized);
+    internalCacheBuilder.setPdxReadSerialized(readSerialized);
     return this;
   }
 
@@ -366,7 +185,7 @@ public class CacheFactory {
    * @return this CacheFactory
    */
   public CacheFactory setSecurityManager(SecurityManager securityManager) {
-    this.cacheConfig.setSecurityManager(securityManager);
+    internalCacheBuilder.setSecurityManager(securityManager);
     return this;
   }
 
@@ -381,7 +200,7 @@ public class CacheFactory {
    * @return this CacheFactory
    */
   public CacheFactory setPostProcessor(PostProcessor postProcessor) {
-    this.cacheConfig.setPostProcessor(postProcessor);
+    internalCacheBuilder.setPostProcessor(postProcessor);
     return this;
   }
 
@@ -396,7 +215,7 @@ public class CacheFactory {
    * @see PdxSerializer
    */
   public CacheFactory setPdxSerializer(PdxSerializer serializer) {
-    this.cacheConfig.setPdxSerializer(serializer);
+    internalCacheBuilder.setPdxSerializer(serializer);
     return this;
   }
 
@@ -412,7 +231,7 @@ public class CacheFactory {
    * @since GemFire 6.6
    */
   public CacheFactory setPdxDiskStore(String diskStoreName) {
-    this.cacheConfig.setPdxDiskStore(diskStoreName);
+    internalCacheBuilder.setPdxDiskStore(diskStoreName);
     return this;
   }
 
@@ -427,7 +246,7 @@ public class CacheFactory {
    * @since GemFire 6.6
    */
   public CacheFactory setPdxPersistent(boolean isPersistent) {
-    this.cacheConfig.setPdxPersistent(isPersistent);
+    internalCacheBuilder.setPdxPersistent(isPersistent);
     return this;
   }
 
@@ -446,7 +265,94 @@ public class CacheFactory {
    * @since GemFire 6.6
    */
   public CacheFactory setPdxIgnoreUnreadFields(boolean ignore) {
-    this.cacheConfig.setPdxIgnoreUnreadFields(ignore);
+    internalCacheBuilder.setPdxIgnoreUnreadFields(ignore);
     return this;
   }
+
+  /**
+   * Creates a new cache that uses the specified {@code system}.
+   * <p>
+   * The {@code system} can specify a
+   * <A 
href="../distributed/DistributedSystem.html#cache-xml-file">"cache-xml-file"</a>
 property
+   * which will cause this creation to also create the regions, objects, and 
attributes declared in
+   * the file. The contents of the file must comply with the {@code 
"doc-files/cache8_0.dtd">} file.
+   * Note that when parsing the XML file {@link Declarable} classes are loaded 
using the current
+   * thread's {@linkplain Thread#getContextClassLoader context class loader}.
+   *
+   * @param system a {@code DistributedSystem} obtained by calling
+   *        {@link DistributedSystem#connect}.
+   *
+   * @return a {@code Cache} that uses the specified {@code system} for 
distribution.
+   *
+   * @throws IllegalArgumentException If {@code system} is not {@link 
DistributedSystem#isConnected
+   *         connected}.
+   * @throws CacheExistsException If an open cache already exists.
+   * @throws CacheXmlException If a problem occurs while parsing the 
declarative caching XML file.
+   * @throws TimeoutException If a {@link Region#put(Object, Object)} times 
out while initializing
+   *         the cache.
+   * @throws CacheWriterException If a {@code CacheWriterException} is thrown 
while initializing the
+   *         cache.
+   * @throws GatewayException If a {@code GatewayException} is thrown while 
initializing the cache.
+   * @throws RegionExistsException If the declarative caching XML file 
describes a region that
+   *         already exists (including the root region).
+   * @deprecated as of 6.5 use {@link #CacheFactory(Properties)} instead.
+   */
+  @Deprecated
+  public static Cache create(DistributedSystem system) throws 
CacheExistsException,
+      TimeoutException, CacheWriterException, GatewayException, 
RegionExistsException {
+    return CacheFactoryStatics.create(system);
+  }
+
+  /**
+   * Gets the instance of {@link Cache} produced by an earlier call to {@link 
#create()}.
+   *
+   * @param system the {@code DistributedSystem} the cache was created with.
+   * @return the {@link Cache} associated with the specified system.
+   * @throws CacheClosedException if a cache has not been created or the 
created one is
+   *         {@link Cache#isClosed closed}
+   */
+  public static Cache getInstance(DistributedSystem system) {
+    return CacheFactoryStatics.getInstance(system);
+  }
+
+  /**
+   * Gets the instance of {@link Cache} produced by an earlier call to {@link 
#create()} even if it
+   * has been closed.
+   *
+   * @param system the {@code DistributedSystem} the cache was created with.
+   * @return the {@link Cache} associated with the specified system.
+   * @throws CacheClosedException if a cache has not been created
+   * @since GemFire 3.5
+   */
+  public static Cache getInstanceCloseOk(DistributedSystem system) {
+    return CacheFactoryStatics.getInstanceCloseOk(system);
+  }
+
+  /**
+   * Gets an arbitrary open instance of {@link Cache} produced by an earlier 
call to
+   * {@link #create()}.
+   *
+   * <p>
+   * WARNING: To avoid risk of deadlock, do not invoke getAnyInstance() from 
within any
+   * CacheCallback including CacheListener, CacheLoader, CacheWriter, 
TransactionListener,
+   * TransactionWriter. Instead use EntryEvent.getRegion().getCache(),
+   * RegionEvent.getRegion().getCache(), LoaderHelper.getRegion().getCache(), 
or
+   * TransactionEvent.getCache().
+   * </p>
+   *
+   * @throws CacheClosedException if a cache has not been created or the only 
created one is
+   *         {@link Cache#isClosed closed}
+   */
+  public static Cache getAnyInstance() {
+    return CacheFactoryStatics.getAnyInstance();
+  }
+
+  /**
+   * Returns the version of the cache implementation.
+   *
+   * @return the version of the cache implementation as a {@code String}
+   */
+  public static String getVersion() {
+    return GemFireVersion.getGemFireVersion();
+  }
 }
diff --git 
a/geode-core/src/main/java/org/apache/geode/cache/client/ClientCacheFactory.java
 
b/geode-core/src/main/java/org/apache/geode/cache/client/ClientCacheFactory.java
index fe6c097..c1ad84e 100644
--- 
a/geode-core/src/main/java/org/apache/geode/cache/client/ClientCacheFactory.java
+++ 
b/geode-core/src/main/java/org/apache/geode/cache/client/ClientCacheFactory.java
@@ -32,6 +32,7 @@ import 
org.apache.geode.distributed.internal.InternalDistributedSystem;
 import org.apache.geode.internal.GemFireVersion;
 import org.apache.geode.internal.cache.CacheConfig;
 import org.apache.geode.internal.cache.GemFireCacheImpl;
+import org.apache.geode.internal.cache.InternalCacheBuilder;
 import org.apache.geode.pdx.PdxInstance;
 import org.apache.geode.pdx.PdxSerializer;
 import org.apache.geode.security.AuthenticationFailedException;
@@ -256,7 +257,11 @@ public class ClientCacheFactory {
 
         return instance;
       } else {
-        return GemFireCacheImpl.createClient(system, this.pf, cacheConfig);
+
+        return (InternalClientCache) new InternalCacheBuilder(cacheConfig)
+            .setIsClient(true)
+            .setPoolFactory(pf)
+            .create(system);
       }
     }
   }
diff --git 
a/geode-core/src/main/java/org/apache/geode/distributed/internal/ConnectionConfig.java
 
b/geode-core/src/main/java/org/apache/geode/distributed/internal/ConnectionConfig.java
new file mode 100644
index 0000000..14c7f9b
--- /dev/null
+++ 
b/geode-core/src/main/java/org/apache/geode/distributed/internal/ConnectionConfig.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.apache.geode.distributed.internal;
+
+import org.apache.geode.distributed.internal.membership.QuorumChecker;
+
+/**
+ * Contains the distribution config and internal properties for connecting.
+ */
+public interface ConnectionConfig {
+  /**
+   * Indicates whether this config is reconnecting.
+   */
+  boolean isReconnecting();
+
+  /**
+   * Returns the quorum checker to use while reconnecting.
+   */
+  QuorumChecker quorumChecker();
+
+  /**
+   * Returns the distribution config.
+   */
+  DistributionConfig distributionConfig();
+}
diff --git 
a/geode-core/src/main/java/org/apache/geode/distributed/internal/ConnectionConfigImpl.java
 
b/geode-core/src/main/java/org/apache/geode/distributed/internal/ConnectionConfigImpl.java
new file mode 100644
index 0000000..2869955
--- /dev/null
+++ 
b/geode-core/src/main/java/org/apache/geode/distributed/internal/ConnectionConfigImpl.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.apache.geode.distributed.internal;
+
+import static 
org.apache.geode.distributed.internal.DistributionConfig.DS_CONFIG_NAME;
+import static 
org.apache.geode.distributed.internal.DistributionConfig.DS_QUORUM_CHECKER_NAME;
+import static 
org.apache.geode.distributed.internal.DistributionConfig.DS_RECONNECTING_NAME;
+
+import java.util.Properties;
+import java.util.function.Supplier;
+
+import org.apache.geode.distributed.internal.membership.QuorumChecker;
+
+public class ConnectionConfigImpl implements ConnectionConfig {
+  private final boolean isReconnecting;
+  private final QuorumChecker quorumChecker;
+  private final DistributionConfigImpl distributionConfig;
+
+  ConnectionConfigImpl(Properties properties) {
+    isReconnecting = convert(properties.get(DS_RECONNECTING_NAME), 
Boolean.class,
+        () -> Boolean.FALSE);
+    quorumChecker = convert(properties.get(DS_QUORUM_CHECKER_NAME), 
QuorumChecker.class,
+        () -> null);
+    distributionConfig = convert(properties.get(DS_CONFIG_NAME), 
DistributionConfigImpl.class,
+        () -> new DistributionConfigImpl(removeNonUserProperties(properties)));
+  }
+
+  @Override
+  public boolean isReconnecting() {
+    return isReconnecting;
+  }
+
+  @Override
+  public QuorumChecker quorumChecker() {
+    return quorumChecker;
+  }
+
+  @Override
+  public DistributionConfig distributionConfig() {
+    return distributionConfig;
+  }
+
+  /**
+   * Remove the non distribution config properties so that they are not passed 
to the {@code
+   * DistributionConfigImpl} constructor
+   */
+  private static Properties removeNonUserProperties(Properties properties) {
+    Properties cleanedProperties = new Properties();
+    properties.forEach(cleanedProperties::put);
+    cleanedProperties.remove(DS_QUORUM_CHECKER_NAME);
+    cleanedProperties.remove(DS_RECONNECTING_NAME);
+    cleanedProperties.remove(DS_CONFIG_NAME);
+    return cleanedProperties;
+  }
+
+  /**
+   * Casts the object to the specified type, or returns the default value if 
the object cannot be
+   * cast.
+   */
+  private static <T> T convert(Object object, Class<T> type, Supplier<T> 
defaultValueSupplier) {
+    return type.isInstance(object) ? type.cast(object) : 
defaultValueSupplier.get();
+  }
+}
diff --git 
a/geode-core/src/main/java/org/apache/geode/distributed/internal/InternalDistributedSystem.java
 
b/geode-core/src/main/java/org/apache/geode/distributed/internal/InternalDistributedSystem.java
index 03410c5..044e1d1 100644
--- 
a/geode-core/src/main/java/org/apache/geode/distributed/internal/InternalDistributedSystem.java
+++ 
b/geode-core/src/main/java/org/apache/geode/distributed/internal/InternalDistributedSystem.java
@@ -17,6 +17,7 @@ package org.apache.geode.distributed.internal;
 
 import static org.apache.geode.distributed.ConfigurationProperties.LOCATORS;
 import static org.apache.geode.distributed.ConfigurationProperties.MCAST_PORT;
+import static 
org.apache.geode.distributed.internal.DistributionConfig.GEMFIRE_PREFIX;
 
 import java.io.File;
 import java.io.IOException;
@@ -60,7 +61,6 @@ import org.apache.geode.annotations.Immutable;
 import org.apache.geode.annotations.internal.MakeNotStatic;
 import org.apache.geode.annotations.internal.MutableForTesting;
 import org.apache.geode.cache.CacheClosedException;
-import org.apache.geode.cache.CacheFactory;
 import org.apache.geode.cache.server.CacheServer;
 import org.apache.geode.distributed.DistributedMember;
 import org.apache.geode.distributed.DistributedSystem;
@@ -82,11 +82,11 @@ import org.apache.geode.internal.alerting.AlertLevel;
 import org.apache.geode.internal.alerting.AlertMessaging;
 import org.apache.geode.internal.alerting.AlertingService;
 import org.apache.geode.internal.alerting.AlertingSession;
-import org.apache.geode.internal.cache.CacheConfig;
 import org.apache.geode.internal.cache.CacheServerImpl;
 import org.apache.geode.internal.cache.EventID;
 import org.apache.geode.internal.cache.GemFireCacheImpl;
 import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.internal.cache.InternalCacheBuilder;
 import org.apache.geode.internal.cache.execute.FunctionServiceStats;
 import org.apache.geode.internal.cache.execute.FunctionStats;
 import org.apache.geode.internal.cache.execute.InternalFunctionService;
@@ -136,18 +136,21 @@ public class InternalDistributedSystem extends 
DistributedSystem
    * True if the user is allowed lock when memory resources appear to be 
overcommitted.
    */
   private static final boolean ALLOW_MEMORY_LOCK_WHEN_OVERCOMMITTED =
-      Boolean.getBoolean(DistributionConfig.GEMFIRE_PREFIX + 
"Cache.ALLOW_MEMORY_OVERCOMMIT");
+      Boolean.getBoolean(GEMFIRE_PREFIX + "Cache.ALLOW_MEMORY_OVERCOMMIT");
   private static final Logger logger = LogService.getLogger();
 
   private static final String DISABLE_MANAGEMENT_PROPERTY =
-      DistributionConfig.GEMFIRE_PREFIX + "disableManagement";
+      GEMFIRE_PREFIX + "disableManagement";
+
+  public static final String ALLOW_MULTIPLE_SYSTEMS_PROPERTY =
+      GEMFIRE_PREFIX + "ALLOW_MULTIPLE_SYSTEMS";
 
   /**
    * Feature flag to enable multiple caches within a JVM.
    */
   @MutableForTesting
   public static boolean ALLOW_MULTIPLE_SYSTEMS =
-      Boolean.getBoolean(DistributionConfig.GEMFIRE_PREFIX + 
"ALLOW_MULTIPLE_SYSTEMS");
+      Boolean.getBoolean(ALLOW_MULTIPLE_SYSTEMS_PROPERTY);
 
   /**
    * If auto-reconnect is going on this will hold a reference to it
@@ -195,7 +198,7 @@ public class InternalDistributedSystem extends 
DistributedSystem
    * Otherwise, create a new InternalDistributedSystem with the given 
properties, or connect to an
    * existing one with the same properties.
    */
-  public static DistributedSystem connectInternal(Properties config,
+  public static InternalDistributedSystem connectInternal(Properties config,
       SecurityConfig securityConfig) {
     if (config == null) {
       config = new Properties();
@@ -209,7 +212,8 @@ public class InternalDistributedSystem extends 
DistributedSystem
       if (ClusterDistributionManager.isDedicatedAdminVM()) {
         // For a dedicated admin VM, check to see if there is already
         // a connect that will suit our purposes.
-        DistributedSystem existingSystem = getConnection(config);
+        InternalDistributedSystem existingSystem =
+            (InternalDistributedSystem) getConnection(config);
         if (existingSystem != null) {
           return existingSystem;
         }
@@ -352,12 +356,12 @@ public class InternalDistributedSystem extends 
DistributedSystem
    * 38407
    */
   public static final String DISABLE_SHUTDOWN_HOOK_PROPERTY =
-      DistributionConfig.GEMFIRE_PREFIX + "disableShutdownHook";
+      GEMFIRE_PREFIX + "disableShutdownHook";
 
   /**
    * A property to append to existing log-file instead of truncating it.
    */
-  public static final String APPEND_TO_LOG_FILE = 
DistributionConfig.GEMFIRE_PREFIX + "append-log";
+  public static final String APPEND_TO_LOG_FILE = GEMFIRE_PREFIX + 
"append-log";
 
   //////////////////// Configuration Fields ////////////////////
 
@@ -367,7 +371,7 @@ public class InternalDistributedSystem extends 
DistributedSystem
   private final DistributionConfig originalConfig;
 
   private final boolean statsDisabled =
-      Boolean.getBoolean(DistributionConfig.GEMFIRE_PREFIX + "statsDisabled");
+      Boolean.getBoolean(GEMFIRE_PREFIX + "statsDisabled");
 
   /**
    * The config object to which most configuration work is delegated
@@ -452,21 +456,38 @@ public class InternalDistributedSystem extends 
DistributedSystem
   }
 
   /**
-   * creates a non-functional instance for testing
+   * Creates a non-functional instance for testing.
    *
-   * @param nonDefault - non-default distributed system properties
+   * @param distributionManager the distribution manager for the test instance
+   * @param properties properties to configure the test instance
    */
-  public static InternalDistributedSystem 
newInstanceForTesting(DistributionManager dm,
-      Properties nonDefault) {
-    InternalDistributedSystem sys = new InternalDistributedSystem(nonDefault);
-    sys.config = new RuntimeDistributionConfigImpl(sys);
-    sys.dm = dm;
-    sys.isConnected = true;
-    return sys;
+  public static InternalDistributedSystem newInstanceForTesting(
+      DistributionManager distributionManager, Properties properties) {
+    StatisticsManagerFactory statisticsManagerFactory = 
defaultStatisticsManagerFactory();
+
+    return newInstanceForTesting(
+        distributionManager, properties, statisticsManagerFactory);
   }
 
-  public static InternalDistributedSystem 
newInstanceForTesting(StatisticsManagerFactory factory) {
-    return new InternalDistributedSystem(new Properties(), factory);
+  /**
+   * Creates a non-functional instance for testing.
+   *
+   * @param distributionManager the distribution manager for the test instance
+   * @param properties properties to configure the test instance
+   * @param statisticsManagerFactory the statistics manager factory for the 
test instance
+   */
+  static InternalDistributedSystem newInstanceForTesting(DistributionManager 
distributionManager,
+      Properties properties, StatisticsManagerFactory 
statisticsManagerFactory) {
+    ConnectionConfigImpl connectionConfig = new 
ConnectionConfigImpl(properties);
+
+    InternalDistributedSystem internalDistributedSystem =
+        new InternalDistributedSystem(connectionConfig, 
statisticsManagerFactory);
+
+    internalDistributedSystem.config = new 
RuntimeDistributionConfigImpl(internalDistributedSystem);
+    internalDistributedSystem.dm = distributionManager;
+    internalDistributedSystem.isConnected = true;
+
+    return internalDistributedSystem;
   }
 
   public static boolean removeSystem(InternalDistributedSystem oldSystem) {
@@ -573,20 +594,40 @@ public class InternalDistributedSystem extends 
DistributedSystem
     reconnectAttemptCounter = 0;
   }
 
-  private InternalDistributedSystem(Properties properties) {
-    this(properties, defaultStatisticsManagerFactory());
+  /**
+   * Creates a new {@code InternalDistributedSystem} with the given 
configuration properties.
+   * Does all of the magic of finding the "default" values of properties.
+   * <p>
+   * See {@link #connect} for a list of exceptions that may be thrown.
+   *
+   * @param configurationProperties properties to configure the connection
+   */
+  private InternalDistributedSystem(Properties configurationProperties) {
+    this(new ConnectionConfigImpl(configurationProperties));
   }
 
   /**
-   * Creates a new <code>InternalDistributedSystem</code> with the given 
configuration properties.
-   * Does all of the magic of finding the "default" values of properties. See
-   * {@link DistributedSystem#connect} for a list of exceptions that may be 
thrown.
+   * Creates a new {@code InternalDistributedSystem} with the given 
configuration.
+   * <p>
+   * See {@link #connect} for a list of exceptions that may be thrown.
    *
-   * @param nonDefault The non-default configuration properties specified by 
the caller
+   * @param config the configuration for the connection
+   */
+  private InternalDistributedSystem(ConnectionConfig config) {
+    this(config, defaultStatisticsManagerFactory());
+    isReconnectingDS = config.isReconnecting();
+    quorumChecker = config.quorumChecker();
+  }
+
+  /**
+   * Creates a new {@code InternalDistributedSystem} with the given 
configuration.
+   * <p>
+   * See {@link #connect} for a list of exceptions that may be thrown.
    *
-   * @see DistributedSystem#connect
+   * @param config the configuration for the connection
+   * @param statisticsManagerFactory creates the statistics manager for this 
member
    */
-  private InternalDistributedSystem(Properties nonDefault,
+  private InternalDistributedSystem(ConnectionConfig config,
       StatisticsManagerFactory statisticsManagerFactory) {
     alertingSession = AlertingSession.create();
     alertingService = new AlertingService();
@@ -597,24 +638,7 @@ public class InternalDistributedSystem extends 
DistributedSystem
     // "precious" thread
     DSFIDFactory.registerTypes();
 
-    Object o = nonDefault.remove(DistributionConfig.DS_RECONNECTING_NAME);
-    if (o instanceof Boolean) {
-      this.isReconnectingDS = (Boolean) o;
-    } else {
-      this.isReconnectingDS = false;
-    }
-
-    o = nonDefault.remove(DistributionConfig.DS_QUORUM_CHECKER_NAME);
-    if (o instanceof QuorumChecker) {
-      this.quorumChecker = (QuorumChecker) o;
-    }
-
-    o = nonDefault.remove(DistributionConfig.DS_CONFIG_NAME);
-    if (o instanceof DistributionConfigImpl) {
-      this.originalConfig = (DistributionConfigImpl) o;
-    } else {
-      this.originalConfig = new DistributionConfigImpl(nonDefault);
-    }
+    originalConfig = config.distributionConfig();
 
     ((DistributionConfigImpl) 
this.originalConfig).checkForDisallowedDefaults(); // throws
                                                                                
  // IllegalStateEx
@@ -2446,8 +2470,7 @@ public class InternalDistributedSystem extends 
DistributedSystem
     if (this.isReconnectingDS && forcedDisconnect) {
       return false;
     }
-    synchronized (CacheFactory.class) { // bug #51335 - deadlock with app 
thread trying to create a
-                                        // cache
+    synchronized (InternalCacheBuilder.class) {
       synchronized (GemFireCacheImpl.class) {
         // bug 39329: must lock reconnectLock *after* the cache
         synchronized (this.reconnectLock) {
@@ -2702,11 +2725,9 @@ public class InternalDistributedSystem extends 
DistributedSystem
             do {
               retry = false;
               try {
-                CacheConfig config = new CacheConfig();
-                if (cacheXML != null) {
-                  config.setCacheXMLDescription(cacheXML);
-                }
-                cache = GemFireCacheImpl.create(this.reconnectDS, config);
+                cache = new InternalCacheBuilder()
+                    .setCacheXMLDescription(cacheXML)
+                    .create(reconnectDS);
 
                 if (!cache.isClosed()) {
                   createAndStartCacheServers(cacheServerCreation, cache);
@@ -3029,5 +3050,4 @@ public class InternalDistributedSystem extends 
DistributedSystem
       }
     };
   }
-
 }
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/CacheConfig.java 
b/geode-core/src/main/java/org/apache/geode/internal/cache/CacheConfig.java
index b6d0046..3870247 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/CacheConfig.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/CacheConfig.java
@@ -17,7 +17,7 @@ package org.apache.geode.internal.cache;
 import java.util.List;
 
 import org.apache.geode.annotations.Immutable;
-import org.apache.geode.cache.client.internal.InternalClientCache;
+import org.apache.geode.cache.GemFireCache;
 import org.apache.geode.internal.cache.xmlcache.CacheServerCreation;
 import org.apache.geode.pdx.PdxSerializer;
 import org.apache.geode.pdx.ReflectionBasedAutoSerializer;
@@ -169,7 +169,7 @@ public class CacheConfig {
     this.cacheServerCreation = servers;
   }
 
-  public void validateCacheConfig(InternalClientCache cacheInstance) {
+  public void validateCacheConfig(GemFireCache cacheInstance) {
     // To fix bug 44961 only validate our attributes against the existing cache
     // if they have been explicitly set by the set.
     // So all the following "ifs" check that "*UserSet" is true.
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/CacheFactoryStatics.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/CacheFactoryStatics.java
new file mode 100644
index 0000000..28010c0
--- /dev/null
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/CacheFactoryStatics.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache;
+
+import java.util.Properties;
+
+import org.apache.geode.cache.Cache;
+import org.apache.geode.cache.CacheClosedException;
+import org.apache.geode.cache.CacheExistsException;
+import org.apache.geode.cache.CacheFactory;
+import org.apache.geode.cache.CacheWriterException;
+import org.apache.geode.cache.CacheXmlException;
+import org.apache.geode.cache.GatewayException;
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.RegionExistsException;
+import org.apache.geode.cache.TimeoutException;
+import org.apache.geode.distributed.DistributedSystem;
+import org.apache.geode.distributed.internal.InternalDistributedSystem;
+
+/**
+ * Implementation of static methods in {@link CacheFactory} including factory 
create methods and
+ * singleton getters.
+ */
+public class CacheFactoryStatics {
+
+  /**
+   * @throws IllegalArgumentException If {@code system} is not {@link 
DistributedSystem#isConnected
+   *         connected}.
+   * @throws CacheExistsException If an open cache already exists.
+   * @throws CacheXmlException If a problem occurs while parsing the 
declarative caching XML file.
+   * @throws TimeoutException If a {@link Region#put(Object, Object)} times 
out while initializing
+   *         the cache.
+   * @throws CacheWriterException If a {@code CacheWriterException} is thrown 
while initializing the
+   *         cache.
+   * @throws GatewayException If a {@code GatewayException} is thrown while 
initializing the cache.
+   * @throws RegionExistsException If the declarative caching XML file 
describes a region that
+   *         already exists (including the root region).
+   * @deprecated as of 6.5 use {@link CacheFactory#CacheFactory(Properties)} 
instead.
+   */
+  @Deprecated
+  public static Cache create(DistributedSystem system) throws 
CacheExistsException,
+      TimeoutException, CacheWriterException, GatewayException, 
RegionExistsException {
+    return new InternalCacheBuilder()
+        .setIsExistingOk(false)
+        .create((InternalDistributedSystem) system);
+  }
+
+  /**
+   * @throws CacheClosedException if a cache has not been created or the 
created one is
+   *         {@link Cache#isClosed closed}
+   */
+  public static Cache getInstance(DistributedSystem system) {
+    return basicGetInstance(system, false);
+  }
+
+  /**
+   * @throws CacheClosedException if a cache has not been created
+   */
+  public static Cache getInstanceCloseOk(DistributedSystem system) {
+    return basicGetInstance(system, true);
+  }
+
+  /**
+   * @throws CacheClosedException if a cache has not been created or the only 
created one is
+   *         {@link Cache#isClosed closed}
+   */
+  public static Cache getAnyInstance() {
+    synchronized (InternalCacheBuilder.class) {
+      InternalCache instance = GemFireCacheImpl.getInstance();
+      if (instance == null) {
+        throw new CacheClosedException(
+            "A cache has not yet been created.");
+      } else {
+        instance.getCancelCriterion().checkCancelInProgress(null);
+        return instance;
+      }
+    }
+  }
+
+  private static Cache basicGetInstance(DistributedSystem system, boolean 
closeOk) {
+    // Avoid synchronization if this is an initialization thread to avoid
+    // deadlock when messaging returns to this VM
+    final int initReq = LocalRegion.threadInitLevelRequirement();
+    if (initReq == LocalRegion.ANY_INIT || initReq == 
LocalRegion.BEFORE_INITIAL_IMAGE) { // fix bug
+      // 33471
+      return basicGetInstancePart2(system, closeOk);
+    } else {
+      synchronized (InternalCacheBuilder.class) {
+        return basicGetInstancePart2(system, closeOk);
+      }
+    }
+  }
+
+  private static Cache basicGetInstancePart2(DistributedSystem system, boolean 
closeOk) {
+    InternalCache instance = GemFireCacheImpl.getInstance();
+    if (instance == null) {
+      throw new CacheClosedException(
+          "A cache has not yet been created.");
+    } else {
+      if (instance.isClosed() && !closeOk) {
+        throw instance.getCacheClosedException(
+            "The cache has been closed.", null);
+      }
+      if (!instance.getDistributedSystem().equals(system)) {
+        throw instance.getCacheClosedException(
+            "A cache has not yet been created for the given distributed 
system.");
+      }
+      return instance;
+    }
+  }
+}
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/GemFireCacheImpl.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/GemFireCacheImpl.java
index 022f9a2..7f5caef 100755
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/GemFireCacheImpl.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/GemFireCacheImpl.java
@@ -14,7 +14,6 @@
  */
 package org.apache.geode.internal.cache;
 
-import static 
org.apache.geode.distributed.internal.InternalDistributedSystem.ALLOW_MULTIPLE_SYSTEMS;
 import static 
org.apache.geode.distributed.internal.InternalDistributedSystem.getAnyInstance;
 
 import java.io.BufferedReader;
@@ -776,82 +775,17 @@ public class GemFireCacheImpl implements InternalCache, 
InternalClientCache, Has
     return cache;
   }
 
-  public static GemFireCacheImpl createClient(InternalDistributedSystem 
system, PoolFactory pf,
-      CacheConfig cacheConfig) {
-    return basicCreate(system, true, cacheConfig, pf, true, 
ASYNC_EVENT_LISTENERS, null);
-  }
-
-  public static GemFireCacheImpl create(InternalDistributedSystem system, 
CacheConfig cacheConfig) {
-    return basicCreate(system, true, cacheConfig, null, false, 
ASYNC_EVENT_LISTENERS, null);
-  }
-
-  static GemFireCacheImpl 
createWithAsyncEventListeners(InternalDistributedSystem system,
-      CacheConfig cacheConfig, TypeRegistry typeRegistry) {
-    return basicCreate(system, true, cacheConfig, null, false, true, 
typeRegistry);
-  }
-
-  public static Cache create(InternalDistributedSystem system, boolean 
existingOk,
-      CacheConfig cacheConfig) {
-    return basicCreate(system, existingOk, cacheConfig, null, false, 
ASYNC_EVENT_LISTENERS, null);
-  }
-
-  private static GemFireCacheImpl basicCreate(InternalDistributedSystem 
system, boolean existingOk,
-      CacheConfig cacheConfig, PoolFactory pf, boolean isClient, boolean 
asyncEventListeners,
-      TypeRegistry typeRegistry) throws CacheExistsException, TimeoutException,
-      CacheWriterException, GatewayException, RegionExistsException {
-    try {
-      synchronized (GemFireCacheImpl.class) {
-        GemFireCacheImpl instance = checkExistingCache(existingOk, 
cacheConfig, system);
-        if (instance == null) {
-          instance = new GemFireCacheImpl(isClient, pf, system, cacheConfig, 
asyncEventListeners,
-              typeRegistry);
-          system.setCache(instance);
-          instance.initialize();
-        } else {
-          system.setCache(instance);
-        }
-        return instance;
-      }
-    } catch (CacheXmlException | IllegalArgumentException e) {
-      logger.error(e.getLocalizedMessage()); // TODO: log the full stack trace 
or not?
-      throw e;
-    } catch (Error | RuntimeException e) {
-      logger.error(e);
-      throw e;
-    }
-  }
-
-  private static GemFireCacheImpl checkExistingCache(boolean existingOk, 
CacheConfig cacheConfig,
-      InternalDistributedSystem system) {
-    GemFireCacheImpl instance =
-        ALLOW_MULTIPLE_SYSTEMS ? (GemFireCacheImpl) system.getCache() : 
getInstance();
-
-    if (instance != null && !instance.isClosed()) {
-      if (existingOk) {
-        // Check if cache configuration matches.
-        cacheConfig.validateCacheConfig(instance);
-        return instance;
-      } else {
-        // instance.creationStack argument is for debugging...
-        throw new CacheExistsException(instance,
-            String.format("%s: An open cache already exists.",
-                instance),
-            instance.creationStack);
-      }
-    }
-    return null;
-  }
-
   /**
    * Creates a new instance of GemFireCache and populates it according to the 
{@code cache.xml}, if
    * appropriate.
    *
    * Currently only unit tests set the typeRegistry parameter to a non-null 
value
    */
-  private GemFireCacheImpl(boolean isClient, PoolFactory pf, 
InternalDistributedSystem system,
-      CacheConfig cacheConfig, boolean asyncEventListeners, TypeRegistry 
typeRegistry) {
+  GemFireCacheImpl(boolean isClient, PoolFactory poolFactory,
+      InternalDistributedSystem internalDistributedSystem,
+      CacheConfig cacheConfig, boolean useAsyncEventListeners, TypeRegistry 
typeRegistry) {
     this.isClient = isClient;
-    this.poolFactory = pf;
+    this.poolFactory = poolFactory;
     this.cacheConfig = cacheConfig; // do early for bug 43213
     this.pdxRegistry = typeRegistry;
 
@@ -861,8 +795,8 @@ public class GemFireCacheImpl implements InternalCache, 
InternalClientCache, Has
 
       // start JTA transaction manager within this synchronized block
       // to prevent race with cache close. fixes bug 43987
-      JNDIInvoker.mapTransactions(system);
-      this.system = system;
+      JNDIInvoker.mapTransactions(internalDistributedSystem);
+      this.system = internalDistributedSystem;
       this.dm = this.system.getDistributionManager();
 
       if (!isClient) {
@@ -881,7 +815,7 @@ public class GemFireCacheImpl implements InternalCache, 
InternalClientCache, Has
         this.securityService = SecurityServiceFactory.create();
       }
 
-      DistributionConfig systemConfig = system.getConfig();
+      DistributionConfig systemConfig = internalDistributedSystem.getConfig();
       if (!this.isClient && PoolManager.getAll().isEmpty()) {
         // We only support management on members of a distributed system
         // Should do this: if (!getSystem().isLoner()) {
@@ -913,7 +847,7 @@ public class GemFireCacheImpl implements InternalCache, 
InternalClientCache, Has
       this.cqService = CqServiceProvider.create(this);
 
       // Create the CacheStatistics
-      this.cachePerfStats = new CachePerfStats(system);
+      this.cachePerfStats = new CachePerfStats(internalDistributedSystem);
       CachePerfStats.enableClockStats = 
this.system.getConfig().getEnableTimeStatistics();
 
       this.transactionManager = new TXManagerImpl(this.cachePerfStats, this);
@@ -923,7 +857,7 @@ public class GemFireCacheImpl implements InternalCache, 
InternalClientCache, Has
 
       this.persistentMemberManager = new PersistentMemberManager();
 
-      if (asyncEventListeners) {
+      if (useAsyncEventListeners) {
         this.eventThreadPool = 
LoggingExecutors.newThreadPoolWithFixedFeed("Message Event Thread",
             command -> {
               ConnectionTable.threadWantsSharedResources();
@@ -1007,6 +941,12 @@ public class GemFireCacheImpl implements InternalCache, 
InternalClientCache, Has
   }
 
   @Override
+  public void throwCacheExistsException() {
+    throw new CacheExistsException(this, String.format("%s: An open cache 
already exists.", this),
+        creationStack);
+  }
+
+  @Override
   public HttpService getHttpService() {
     return httpService;
   }
@@ -1206,7 +1146,8 @@ public class GemFireCacheImpl implements InternalCache, 
InternalClientCache, Has
    * Perform initialization, solve the early escaped reference problem by 
putting publishing
    * references to this instance in this method (vs. the constructor).
    */
-  private void initialize() {
+  @Override
+  public void initialize() {
     for (CacheLifecycleListener listener : cacheLifecycleListeners) {
       listener.cacheCreated(this);
     }
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/InternalCache.java 
b/geode-core/src/main/java/org/apache/geode/internal/cache/InternalCache.java
index 3d129f9..87fc1dc 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/InternalCache.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/InternalCache.java
@@ -372,4 +372,8 @@ public interface InternalCache extends Cache, 
Extensible<Cache>, CacheTime {
   InternalCacheForClientAccess getCacheForProcessingClientRequests();
 
   HttpService getHttpService();
+
+  void initialize();
+
+  void throwCacheExistsException();
 }
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/InternalCacheBuilder.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/InternalCacheBuilder.java
new file mode 100644
index 0000000..342cef9
--- /dev/null
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/InternalCacheBuilder.java
@@ -0,0 +1,394 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache;
+
+import static java.util.Objects.requireNonNull;
+import static 
org.apache.geode.distributed.internal.DistributionConfig.GEMFIRE_PREFIX;
+import static 
org.apache.geode.distributed.internal.InternalDistributedSystem.ALLOW_MULTIPLE_SYSTEMS;
+
+import java.util.Optional;
+import java.util.Properties;
+import java.util.function.Supplier;
+
+import org.apache.logging.log4j.Logger;
+
+import org.apache.geode.annotations.VisibleForTesting;
+import org.apache.geode.cache.CacheExistsException;
+import org.apache.geode.cache.CacheFactory;
+import org.apache.geode.cache.CacheWriterException;
+import org.apache.geode.cache.CacheXmlException;
+import org.apache.geode.cache.GatewayException;
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.RegionExistsException;
+import org.apache.geode.cache.TimeoutException;
+import org.apache.geode.cache.client.PoolFactory;
+import org.apache.geode.distributed.ConfigurationProperties;
+import org.apache.geode.distributed.DistributedSystem;
+import org.apache.geode.distributed.internal.InternalDistributedSystem;
+import org.apache.geode.distributed.internal.SecurityConfig;
+import org.apache.geode.internal.logging.LogService;
+import org.apache.geode.pdx.PdxSerializer;
+import org.apache.geode.pdx.internal.TypeRegistry;
+import org.apache.geode.security.AuthenticationFailedException;
+import org.apache.geode.security.AuthenticationRequiredException;
+import org.apache.geode.security.GemFireSecurityException;
+import org.apache.geode.security.PostProcessor;
+import org.apache.geode.security.SecurityManager;
+
+public class InternalCacheBuilder {
+  private static final Logger logger = LogService.getLogger();
+
+  private static final String USE_ASYNC_EVENT_LISTENERS_PROPERTY =
+      GEMFIRE_PREFIX + "Cache.ASYNC_EVENT_LISTENERS";
+
+  private static final boolean IS_EXISTING_OK_DEFAULT = true;
+  private static final boolean IS_CLIENT_DEFAULT = false;
+
+  private final Properties configProperties;
+  private final CacheConfig cacheConfig;
+  private final Supplier<InternalDistributedSystem> singletonSystemSupplier;
+  private final Supplier<InternalCache> singletonCacheSupplier;
+  private final InternalDistributedSystemConstructor 
internalDistributedSystemConstructor;
+  private final InternalCacheConstructor internalCacheConstructor;
+
+  private boolean isExistingOk = IS_EXISTING_OK_DEFAULT;
+  private boolean isClient = IS_CLIENT_DEFAULT;
+
+  /**
+   * Setting useAsyncEventListeners to true will invoke event listeners in 
asynchronously.
+   *
+   * <p>
+   * Default is specified by system property {@code 
gemfire.Cache.ASYNC_EVENT_LISTENERS}.
+   */
+  private boolean useAsyncEventListeners = 
Boolean.getBoolean(USE_ASYNC_EVENT_LISTENERS_PROPERTY);
+
+  private PoolFactory poolFactory;
+  private TypeRegistry typeRegistry;
+
+  /**
+   * Creates a cache factory with default configuration properties.
+   */
+  public InternalCacheBuilder() {
+    this(new Properties(), new CacheConfig());
+  }
+
+  /**
+   * Create a cache factory initialized with the given configuration 
properties. For a list of valid
+   * configuration properties and their meanings see {@link 
ConfigurationProperties}.
+   *
+   * @param configProperties the configuration properties to initialize the 
factory with.
+   */
+  public InternalCacheBuilder(Properties configProperties) {
+    this(configProperties == null ? new Properties() : configProperties, new 
CacheConfig());
+  }
+
+  /**
+   * Creates a cache factory with default configuration properties.
+   */
+  public InternalCacheBuilder(CacheConfig cacheConfig) {
+    this(new Properties(), cacheConfig);
+  }
+
+  private InternalCacheBuilder(Properties configProperties, CacheConfig 
cacheConfig) {
+    this(configProperties, cacheConfig, 
InternalDistributedSystem::getConnectedInstance,
+        InternalDistributedSystem::connectInternal,
+        GemFireCacheImpl::getInstance, GemFireCacheImpl::new);
+  }
+
+  @VisibleForTesting
+  InternalCacheBuilder(Properties configProperties,
+      CacheConfig cacheConfig,
+      Supplier<InternalDistributedSystem> singletonSystemSupplier,
+      InternalDistributedSystemConstructor 
internalDistributedSystemConstructor,
+      Supplier<InternalCache> singletonCacheSupplier,
+      InternalCacheConstructor internalCacheConstructor) {
+    this.configProperties = configProperties;
+    this.cacheConfig = cacheConfig;
+    this.singletonSystemSupplier = singletonSystemSupplier;
+    this.internalDistributedSystemConstructor = 
internalDistributedSystemConstructor;
+    this.internalCacheConstructor = internalCacheConstructor;
+    this.singletonCacheSupplier = singletonCacheSupplier;
+  }
+
+  /**
+   * @see CacheFactory#create()
+   *
+   * @throws CacheXmlException If a problem occurs while parsing the 
declarative caching XML file.
+   * @throws TimeoutException If a {@link Region#put(Object, Object)} times 
out while initializing
+   *         the cache.
+   * @throws CacheWriterException If a {@code CacheWriterException} is thrown 
while initializing the
+   *         cache.
+   * @throws GatewayException If a {@code GatewayException} is thrown while 
initializing the cache.
+   * @throws RegionExistsException If the declarative caching XML file 
describes a region that
+   *         already exists (including the root region).
+   * @throws IllegalStateException if cache already exists and is not 
compatible with the new
+   *         configuration.
+   * @throws AuthenticationFailedException if authentication fails.
+   * @throws AuthenticationRequiredException if the distributed system is in 
secure mode and this
+   *         new member is not configured with security credentials.
+   */
+  public InternalCache create()
+      throws TimeoutException, CacheWriterException, GatewayException, 
RegionExistsException {
+    synchronized (InternalCacheBuilder.class) {
+      InternalDistributedSystem internalDistributedSystem = 
findInternalDistributedSystem()
+          .orElseGet(() -> createInternalDistributedSystem());
+      return create(internalDistributedSystem);
+    }
+  }
+
+  /**
+   * @see CacheFactory#create(DistributedSystem)
+   *
+   * @throws IllegalArgumentException If {@code system} is not {@link 
DistributedSystem#isConnected
+   *         connected}.
+   * @throws CacheExistsException If an open cache already exists.
+   * @throws CacheXmlException If a problem occurs while parsing the 
declarative caching XML file.
+   * @throws TimeoutException If a {@link Region#put(Object, Object)} times 
out while initializing
+   *         the cache.
+   * @throws CacheWriterException If a {@code CacheWriterException} is thrown 
while initializing the
+   *         cache.
+   * @throws GatewayException If a {@code GatewayException} is thrown while 
initializing the cache.
+   * @throws RegionExistsException If the declarative caching XML file 
describes a region that
+   *         already exists (including the root region).
+   */
+  public InternalCache create(InternalDistributedSystem 
internalDistributedSystem)
+      throws TimeoutException, CacheWriterException, GatewayException, 
RegionExistsException {
+    requireNonNull(internalDistributedSystem, "internalDistributedSystem");
+    try {
+      synchronized (InternalCacheBuilder.class) {
+        synchronized (GemFireCacheImpl.class) {
+          InternalCache cache =
+              existingCache(internalDistributedSystem::getCache, 
singletonCacheSupplier);
+          if (cache == null) {
+
+            cache =
+                internalCacheConstructor.construct(isClient, poolFactory, 
internalDistributedSystem,
+                    cacheConfig, useAsyncEventListeners, typeRegistry);
+
+            internalDistributedSystem.setCache(cache);
+            cache.initialize();
+
+          } else {
+            internalDistributedSystem.setCache(cache);
+          }
+
+          return cache;
+        }
+      }
+    } catch (CacheXmlException | IllegalArgumentException e) {
+      logger.error(e.getLocalizedMessage());
+      throw e;
+    } catch (Error | RuntimeException e) {
+      logger.error(e);
+      throw e;
+    }
+  }
+
+  /**
+   * @see CacheFactory#set(String, String)
+   */
+  public InternalCacheBuilder set(String name, String value) {
+    configProperties.setProperty(name, value);
+    return this;
+  }
+
+  /**
+   * @see CacheFactory#setPdxReadSerialized(boolean)
+   */
+  public InternalCacheBuilder setPdxReadSerialized(boolean readSerialized) {
+    cacheConfig.setPdxReadSerialized(readSerialized);
+    return this;
+  }
+
+  /**
+   * @see CacheFactory#setSecurityManager(SecurityManager)
+   */
+  public InternalCacheBuilder setSecurityManager(SecurityManager 
securityManager) {
+    cacheConfig.setSecurityManager(securityManager);
+    return this;
+  }
+
+  /**
+   * @see CacheFactory#setPostProcessor(PostProcessor)
+   */
+  public InternalCacheBuilder setPostProcessor(PostProcessor postProcessor) {
+    cacheConfig.setPostProcessor(postProcessor);
+    return this;
+  }
+
+  /**
+   * @see CacheFactory#setPdxSerializer(PdxSerializer)
+   */
+  public InternalCacheBuilder setPdxSerializer(PdxSerializer serializer) {
+    cacheConfig.setPdxSerializer(serializer);
+    return this;
+  }
+
+  /**
+   * @see CacheFactory#setPdxDiskStore(String)
+   */
+  public InternalCacheBuilder setPdxDiskStore(String diskStoreName) {
+    cacheConfig.setPdxDiskStore(diskStoreName);
+    return this;
+  }
+
+  /**
+   * @see CacheFactory#setPdxPersistent(boolean)
+   */
+  public InternalCacheBuilder setPdxPersistent(boolean isPersistent) {
+    cacheConfig.setPdxPersistent(isPersistent);
+    return this;
+  }
+
+  /**
+   * @see CacheFactory#setPdxIgnoreUnreadFields(boolean)
+   */
+  public InternalCacheBuilder setPdxIgnoreUnreadFields(boolean ignore) {
+    cacheConfig.setPdxIgnoreUnreadFields(ignore);
+    return this;
+  }
+
+  public InternalCacheBuilder setCacheXMLDescription(String cacheXML) {
+    if (cacheXML != null) {
+      cacheConfig.setCacheXMLDescription(cacheXML);
+    }
+    return this;
+  }
+
+  /**
+   * @param isExistingOk default is true.
+   */
+  public InternalCacheBuilder setIsExistingOk(boolean isExistingOk) {
+    this.isExistingOk = isExistingOk;
+    return this;
+  }
+
+  /**
+   * @param isClient default is false.
+   */
+  public InternalCacheBuilder setIsClient(boolean isClient) {
+    this.isClient = isClient;
+    return this;
+  }
+
+  /**
+   * @param useAsyncEventListeners default is specified by the system property
+   *        {@code gemfire.Cache.ASYNC_EVENT_LISTENERS}.
+   */
+  public InternalCacheBuilder setUseAsyncEventListeners(boolean 
useAsyncEventListeners) {
+    this.useAsyncEventListeners = useAsyncEventListeners;
+    return this;
+  }
+
+  /**
+   * @param poolFactory default is null.
+   */
+  public InternalCacheBuilder setPoolFactory(PoolFactory poolFactory) {
+    this.poolFactory = poolFactory;
+    return this;
+  }
+
+  /**
+   * @param typeRegistry default is null.
+   */
+  public InternalCacheBuilder setTypeRegistry(TypeRegistry typeRegistry) {
+    this.typeRegistry = typeRegistry;
+    return this;
+  }
+
+  private Optional<InternalDistributedSystem> findInternalDistributedSystem() {
+    InternalDistributedSystem internalDistributedSystem = null;
+    if (configProperties.isEmpty() && !ALLOW_MULTIPLE_SYSTEMS) {
+      // any ds will do
+      internalDistributedSystem = singletonSystemSupplier.get();
+      validateUsabilityOfSecurityCallbacks(internalDistributedSystem, 
cacheConfig);
+    }
+    return Optional.ofNullable(internalDistributedSystem);
+  }
+
+  private InternalDistributedSystem createInternalDistributedSystem() {
+    SecurityConfig securityConfig = new SecurityConfig(
+        cacheConfig.getSecurityManager(),
+        cacheConfig.getPostProcessor());
+
+    return internalDistributedSystemConstructor.construct(configProperties, 
securityConfig);
+  }
+
+  private InternalCache existingCache(Supplier<? extends InternalCache> 
systemCacheSupplier,
+      Supplier<? extends InternalCache> singletonCacheSupplier) {
+    InternalCache cache = ALLOW_MULTIPLE_SYSTEMS
+        ? systemCacheSupplier.get()
+        : singletonCacheSupplier.get();
+
+    if (validateExistingCache(cache)) {
+      return cache;
+    }
+
+    return null;
+  }
+
+  /**
+   * Validates that isExistingOk is true and existing cache is compatible with 
cacheConfig.
+   *
+   * if instance exists and cacheConfig is incompatible
+   * if instance exists and isExistingOk is false
+   */
+  private boolean validateExistingCache(InternalCache existingCache) {
+    if (existingCache == null || existingCache.isClosed()) {
+      return false;
+    }
+
+    if (isExistingOk) {
+      cacheConfig.validateCacheConfig(existingCache);
+    } else {
+      existingCache.throwCacheExistsException();
+    }
+
+    return true;
+  }
+
+  /**
+   * if existing DistributedSystem connection cannot use specified 
SecurityManager or
+   * PostProcessor.
+   */
+  private static void validateUsabilityOfSecurityCallbacks(
+      InternalDistributedSystem internalDistributedSystem, CacheConfig 
cacheConfig)
+      throws GemFireSecurityException {
+    if (internalDistributedSystem == null) {
+      return;
+    }
+    // pre-existing DistributedSystem already has an incompatible 
SecurityService in use
+    if (cacheConfig.getSecurityManager() != null) {
+      throw new GemFireSecurityException(
+          "Existing DistributedSystem connection cannot use specified 
SecurityManager");
+    }
+    if (cacheConfig.getPostProcessor() != null) {
+      throw new GemFireSecurityException(
+          "Existing DistributedSystem connection cannot use specified 
PostProcessor");
+    }
+  }
+
+  @VisibleForTesting
+  interface InternalCacheConstructor {
+    InternalCache construct(boolean isClient, PoolFactory poolFactory,
+        InternalDistributedSystem internalDistributedSystem, CacheConfig 
cacheConfig,
+        boolean useAsyncEventListeners, TypeRegistry typeRegistry);
+  }
+
+  @VisibleForTesting
+  interface InternalDistributedSystemConstructor {
+    InternalDistributedSystem construct(Properties configProperties, 
SecurityConfig securityConfig);
+  }
+}
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/InternalCacheForClientAccess.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/InternalCacheForClientAccess.java
index 9a305f3..e64c6a7 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/InternalCacheForClientAccess.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/InternalCacheForClientAccess.java
@@ -1223,4 +1223,14 @@ public class InternalCacheForClientAccess implements 
InternalCache {
   public HttpService getHttpService() {
     return delegate.getHttpService();
   }
+
+  @Override
+  public void initialize() {
+    // do nothing
+  }
+
+  @Override
+  public void throwCacheExistsException() {
+    delegate.throwCacheExistsException();
+  }
 }
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/CacheCreation.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/CacheCreation.java
index a2e31e5..325ea81 100755
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/CacheCreation.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/CacheCreation.java
@@ -2380,6 +2380,11 @@ public class CacheCreation implements InternalCache {
   }
 
   @Override
+  public void initialize() {
+    throw new UnsupportedOperationException("Should not be invoked");
+  }
+
+  @Override
   public URL getCacheXmlURL() {
     throw new UnsupportedOperationException("Should not be invoked");
   }
@@ -2427,6 +2432,11 @@ public class CacheCreation implements InternalCache {
   }
 
   @Override
+  public void throwCacheExistsException() {
+    throw new UnsupportedOperationException("Should not be invoked");
+  }
+
+  @Override
   public HttpService getHttpService() {
     throw new UnsupportedOperationException("Should not be invoked");
   }
diff --git 
a/geode-core/src/test/java/org/apache/geode/distributed/internal/ConnectionConfigImplTest.java
 
b/geode-core/src/test/java/org/apache/geode/distributed/internal/ConnectionConfigImplTest.java
new file mode 100644
index 0000000..9d5a827
--- /dev/null
+++ 
b/geode-core/src/test/java/org/apache/geode/distributed/internal/ConnectionConfigImplTest.java
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.apache.geode.distributed.internal;
+
+import static 
org.apache.geode.distributed.internal.DistributionConfig.DS_CONFIG_NAME;
+import static 
org.apache.geode.distributed.internal.DistributionConfig.DS_QUORUM_CHECKER_NAME;
+import static 
org.apache.geode.distributed.internal.DistributionConfig.DS_RECONNECTING_NAME;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.mock;
+
+import java.util.Properties;
+
+import org.junit.Test;
+
+import org.apache.geode.distributed.internal.membership.QuorumChecker;
+
+/**
+ * Unit tests for {@link ConnectionConfigImpl}.
+ */
+public class ConnectionConfigImplTest {
+
+  @Test
+  public void distributionConfig_doesNotContainDsQuorumCheckerProperty() {
+    QuorumChecker quorumChecker = mock(QuorumChecker.class);
+    Properties properties = new Properties();
+    properties.put(DS_QUORUM_CHECKER_NAME, quorumChecker);
+
+    ConnectionConfigImpl config = new ConnectionConfigImpl(properties);
+
+    DistributionConfigImpl result = (DistributionConfigImpl) 
config.distributionConfig();
+    assertThat(result.getProps()).doesNotContainKey(DS_QUORUM_CHECKER_NAME);
+  }
+
+  @Test
+  public void distributionConfig_doesNotContainDsReconnectingProperty() {
+    Properties properties = new Properties();
+    properties.put(DS_RECONNECTING_NAME, Boolean.TRUE);
+
+    ConnectionConfigImpl config = new ConnectionConfigImpl(properties);
+
+    DistributionConfigImpl result = (DistributionConfigImpl) 
config.distributionConfig();
+    assertThat(result.getProps()).doesNotContainKey(DS_RECONNECTING_NAME);
+  }
+
+  @Test
+  public void distributionConfig_doesNotContainDsConfigProperty() {
+    Properties properties = new Properties();
+    properties.put(DS_CONFIG_NAME, mock(DistributionConfig.class));
+
+    ConnectionConfigImpl config = new ConnectionConfigImpl(properties);
+
+    DistributionConfigImpl result = (DistributionConfigImpl) 
config.distributionConfig();
+    assertThat(result.getProps()).doesNotContainKey(DS_CONFIG_NAME);
+  }
+
+  @Test
+  public void isReconnecting_isTrue_ifReconnectingPropertyIsTrue() {
+    Properties properties = new Properties();
+
+    properties.put(DS_RECONNECTING_NAME, Boolean.TRUE);
+
+    ConnectionConfigImpl config = new ConnectionConfigImpl(properties);
+
+    assertThat(config.isReconnecting()).isTrue();
+  }
+
+  @Test
+  public void isReconnecting_isFalse_ifReconnectingPropertyIsFalse() {
+    Properties properties = new Properties();
+
+    properties.put(DS_RECONNECTING_NAME, Boolean.FALSE);
+
+    ConnectionConfigImpl config = new ConnectionConfigImpl(properties);
+
+    assertThat(config.isReconnecting()).isFalse();
+  }
+
+  @Test
+  public void isReconnecting_isFalse_ifReconnectingPropertyDoesNotExist() {
+    Properties properties = new Properties();
+
+    properties.remove(DS_RECONNECTING_NAME);
+
+    ConnectionConfigImpl config = new ConnectionConfigImpl(properties);
+
+    assertThat(config.isReconnecting()).isFalse();
+  }
+
+  @Test
+  public void isReconnecting_isFalse_ifReconnectingPropertyIsNotBoolean() {
+    Properties properties = new Properties();
+
+    properties.put(DS_RECONNECTING_NAME, "a string, not a boolean");
+
+    ConnectionConfigImpl config = new ConnectionConfigImpl(properties);
+
+    assertThat(config.isReconnecting()).isFalse();
+  }
+
+  @Test
+  public void 
quorumChecker_returnsQuorumCheckerProperty_ifPropertyIsAQuorumChecker() {
+    QuorumChecker quorumCheckerFromProperties = mock(QuorumChecker.class);
+    Properties properties = new Properties();
+    properties.put(DS_QUORUM_CHECKER_NAME, quorumCheckerFromProperties);
+
+    ConnectionConfigImpl config = new ConnectionConfigImpl(properties);
+
+    assertThat(config.quorumChecker())
+        .isSameAs(quorumCheckerFromProperties);
+  }
+
+  @Test
+  public void quorumChecker_returnsNull_ifQuorumCheckerPropertyDoesNotExist() {
+    Properties properties = new Properties();
+    properties.remove(DS_QUORUM_CHECKER_NAME);
+
+    ConnectionConfigImpl config = new ConnectionConfigImpl(properties);
+
+    assertThat(config.quorumChecker()).isNull();
+  }
+
+  @Test
+  public void 
quorumChecker_returnsNull_ifQuorumCheckerPropertyIsNotAQuorumChecker() {
+    Properties properties = new Properties();
+
+    properties.put(DS_QUORUM_CHECKER_NAME, "a string, not a quorum checker");
+
+    ConnectionConfigImpl config = new ConnectionConfigImpl(properties);
+
+    assertThat(config.quorumChecker()).isNull();
+  }
+
+  @Test
+  public void 
distributionConfig_returnsConfigProperty_ifPropertyIsADistributionConfigImpl() {
+    DistributionConfigImpl distributionConfigFromProperties =
+        new DistributionConfigImpl(new Properties());
+    Properties properties = new Properties();
+    properties.put(DS_CONFIG_NAME, distributionConfigFromProperties);
+
+    ConnectionConfigImpl config = new ConnectionConfigImpl(properties);
+
+    assertThat(config.distributionConfig())
+        .isSameAs(distributionConfigFromProperties);
+  }
+
+  @Test
+  public void 
distributionConfig_returnsDistributionConfigImpl_ifConfigPropertyDoesNotExist() 
{
+    Properties properties = new Properties();
+    properties.remove(DS_CONFIG_NAME);
+
+    ConnectionConfigImpl config = new ConnectionConfigImpl(properties);
+
+    assertThat(config.distributionConfig())
+        .isInstanceOf(DistributionConfigImpl.class);
+  }
+
+  @Test
+  public void 
distributionConfig_returnsDistributionConfigImpl_ifConfigPropertyIsNotADistributionConfigImpl()
 {
+    String distributionConfigFromProperties = "a string, not a distribution 
config";
+    Properties properties = new Properties();
+    properties.put(DS_CONFIG_NAME, distributionConfigFromProperties);
+
+    ConnectionConfigImpl config = new ConnectionConfigImpl(properties);
+
+    assertThat(config.distributionConfig())
+        .isInstanceOf(DistributionConfigImpl.class)
+        .isNotSameAs(distributionConfigFromProperties);
+  }
+}
diff --git 
a/geode-core/src/test/java/org/apache/geode/distributed/internal/InternalDistributedSystemTest.java
 
b/geode-core/src/test/java/org/apache/geode/distributed/internal/InternalDistributedSystemTest.java
index b800923..3c8ad85 100644
--- 
a/geode-core/src/test/java/org/apache/geode/distributed/internal/InternalDistributedSystemTest.java
+++ 
b/geode-core/src/test/java/org/apache/geode/distributed/internal/InternalDistributedSystemTest.java
@@ -27,6 +27,7 @@ import static org.mockito.MockitoAnnotations.initMocks;
 import java.io.IOException;
 import java.io.Reader;
 import java.io.StringReader;
+import java.util.Properties;
 
 import org.junit.Before;
 import org.junit.Test;
@@ -38,7 +39,11 @@ import org.apache.geode.StatisticsType;
 import org.apache.geode.internal.statistics.StatisticsManager;
 import org.apache.geode.internal.statistics.StatisticsManagerFactory;
 
+/**
+ * Unit tests for {@link InternalDistributedSystem}.
+ */
 public class InternalDistributedSystemTest {
+
   private static final String STATISTIC_NAME = "statistic-name";
   private static final String STATISTIC_DESCRIPTION = "statistic-description";
   private static final String STATISTIC_UNITS = "statistic-units";
@@ -46,21 +51,24 @@ public class InternalDistributedSystemTest {
   private static final String STATISTICS_TEXT_ID = "statistics-text-id";
   private static final long STATISTICS_NUMERIC_ID = 2349;
 
+  @Mock(name = "autowiredDistributionManager")
+  private DistributionManager distributionManager;
+
   @Mock(name = "autowiredStatisticsManagerFactory")
-  public StatisticsManagerFactory statisticsManagerFactory;
+  private StatisticsManagerFactory statisticsManagerFactory;
 
   @Mock(name = "autowiredStatisticsManager")
-  public StatisticsManager statisticsManager;
+  private StatisticsManager statisticsManager;
 
   private InternalDistributedSystem internalDistributedSystem;
 
   @Before
-  public void setup() {
+  public void setUp() {
     initMocks(this);
     when(statisticsManagerFactory.create(any(), anyLong(), anyBoolean()))
         .thenReturn(statisticsManager);
-    internalDistributedSystem =
-        
InternalDistributedSystem.newInstanceForTesting(statisticsManagerFactory);
+    internalDistributedSystem = 
InternalDistributedSystem.newInstanceForTesting(distributionManager,
+        new Properties(), statisticsManagerFactory);
   }
 
   @Test
@@ -73,8 +81,8 @@ public class InternalDistributedSystemTest {
         .create(any(), anyLong(), eq(false)))
             .thenReturn(statisticsManagerCreatedByFactory);
 
-    InternalDistributedSystem result =
-        
InternalDistributedSystem.newInstanceForTesting(statisticsManagerFactory);
+    InternalDistributedSystem result = InternalDistributedSystem
+        .newInstanceForTesting(distributionManager, new Properties(), 
statisticsManagerFactory);
 
     assertThat(result.getStatisticsManager())
         .isSameAs(statisticsManagerCreatedByFactory);
diff --git 
a/geode-core/src/test/java/org/apache/geode/internal/cache/GemFireCacheImplTest.java
 
b/geode-core/src/test/java/org/apache/geode/internal/cache/GemFireCacheImplTest.java
index b87ee8d..b278f08 100644
--- 
a/geode-core/src/test/java/org/apache/geode/internal/cache/GemFireCacheImplTest.java
+++ 
b/geode-core/src/test/java/org/apache/geode/internal/cache/GemFireCacheImplTest.java
@@ -29,10 +29,8 @@ import java.io.NotSerializableException;
 import java.util.Properties;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
 
 import org.junit.After;
-import org.junit.Before;
 import org.junit.Test;
 
 import org.apache.geode.SerializationException;
@@ -47,27 +45,22 @@ import 
org.apache.geode.internal.cache.eviction.OffHeapEvictor;
 import org.apache.geode.pdx.internal.TypeRegistry;
 import org.apache.geode.test.fake.Fakes;
 
+/**
+ * Unit tests for {@link GemFireCacheImpl}.
+ */
 public class GemFireCacheImplTest {
 
-  private InternalDistributedSystem distributedSystem;
-  private GemFireCacheImpl cache;
-  private CacheConfig cacheConfig;
-
-  @Before
-  public void setup() {
-    distributedSystem = Fakes.distributedSystem();
-    cacheConfig = new CacheConfig();
-  }
+  private GemFireCacheImpl gemFireCacheImpl;
 
   @After
   public void tearDown() {
-    if (cache != null) {
-      cache.close();
+    if (gemFireCacheImpl != null) {
+      gemFireCacheImpl.close();
     }
   }
 
   @Test
-  public void shouldBeMockable() throws Exception {
+  public void canBeMocked() {
     GemFireCacheImpl mockGemFireCacheImpl = mock(GemFireCacheImpl.class);
     InternalResourceManager mockInternalResourceManager = 
mock(InternalResourceManager.class);
 
@@ -79,198 +72,198 @@ public class GemFireCacheImplTest {
 
   @Test
   public void checkPurgeCCPTimer() {
-    InternalDistributedSystem ds = Fakes.distributedSystem();
-    CacheConfig cc = new CacheConfig();
-    TypeRegistry typeRegistry = mock(TypeRegistry.class);
-    SystemTimer ccpTimer = mock(SystemTimer.class);
-    GemFireCacheImpl gfc = GemFireCacheImpl.createWithAsyncEventListeners(ds, 
cc, typeRegistry);
-    try {
-      gfc.setCCPTimer(ccpTimer);
-      for (int i = 1; i < GemFireCacheImpl.PURGE_INTERVAL; i++) {
-        gfc.purgeCCPTimer();
-        verify(ccpTimer, times(0)).timerPurge();
-      }
-      gfc.purgeCCPTimer();
-      verify(ccpTimer, times(1)).timerPurge();
-      for (int i = 1; i < GemFireCacheImpl.PURGE_INTERVAL; i++) {
-        gfc.purgeCCPTimer();
-        verify(ccpTimer, times(1)).timerPurge();
-      }
-      gfc.purgeCCPTimer();
-      verify(ccpTimer, times(2)).timerPurge();
-    } finally {
-      gfc.close();
+    SystemTimer cacheClientProxyTimer = mock(SystemTimer.class);
+
+    gemFireCacheImpl = createGemFireCacheWithTypeRegistry();
+
+    gemFireCacheImpl.setCCPTimer(cacheClientProxyTimer);
+    for (int i = 1; i < GemFireCacheImpl.PURGE_INTERVAL; i++) {
+      gemFireCacheImpl.purgeCCPTimer();
+      verify(cacheClientProxyTimer, times(0)).timerPurge();
     }
+    gemFireCacheImpl.purgeCCPTimer();
+    verify(cacheClientProxyTimer, times(1)).timerPurge();
+    for (int i = 1; i < GemFireCacheImpl.PURGE_INTERVAL; i++) {
+      gemFireCacheImpl.purgeCCPTimer();
+      verify(cacheClientProxyTimer, times(1)).timerPurge();
+    }
+    gemFireCacheImpl.purgeCCPTimer();
+    verify(cacheClientProxyTimer, times(2)).timerPurge();
   }
 
   @Test
   public void checkEvictorsClosed() {
-    InternalDistributedSystem ds = Fakes.distributedSystem();
-    CacheConfig cc = new CacheConfig();
-    TypeRegistry typeRegistry = mock(TypeRegistry.class);
-    SystemTimer ccpTimer = mock(SystemTimer.class);
-    HeapEvictor he = mock(HeapEvictor.class);
-    OffHeapEvictor ohe = mock(OffHeapEvictor.class);
-    GemFireCacheImpl gfc = GemFireCacheImpl.createWithAsyncEventListeners(ds, 
cc, typeRegistry);
-    try {
-      gfc.setHeapEvictor(he);
-      gfc.setOffHeapEvictor(ohe);
-    } finally {
-      gfc.close();
-    }
-    verify(he, times(1)).close();
-    verify(ohe, times(1)).close();
+    HeapEvictor heapEvictor = mock(HeapEvictor.class);
+    OffHeapEvictor offHeapEvictor = mock(OffHeapEvictor.class);
+
+    gemFireCacheImpl = createGemFireCacheWithTypeRegistry();
+
+    gemFireCacheImpl.setHeapEvictor(heapEvictor);
+    gemFireCacheImpl.setOffHeapEvictor(offHeapEvictor);
+    gemFireCacheImpl.close();
+
+    verify(heapEvictor).close();
+    verify(offHeapEvictor).close();
   }
 
   @Test
   public void registerPdxMetaDataThrowsIfInstanceNotSerializable() {
-    InternalDistributedSystem ds = Fakes.distributedSystem();
-    CacheConfig cc = new CacheConfig();
-    TypeRegistry typeRegistry = mock(TypeRegistry.class);
-    GemFireCacheImpl gfc = GemFireCacheImpl.createWithAsyncEventListeners(ds, 
cc, typeRegistry);
-    try {
-      assertThatThrownBy(() -> gfc.registerPdxMetaData(new Object()))
-          
.isInstanceOf(SerializationException.class).hasMessage("Serialization failed")
-          .hasCauseInstanceOf(NotSerializableException.class);
-    } finally {
-      gfc.close();
-    }
+    gemFireCacheImpl = createGemFireCacheWithTypeRegistry();
+
+    assertThatThrownBy(() -> gemFireCacheImpl.registerPdxMetaData(new 
Object()))
+        .isInstanceOf(SerializationException.class).hasMessage("Serialization 
failed")
+        .hasCauseInstanceOf(NotSerializableException.class);
   }
 
   @Test
   public void registerPdxMetaDataThrowsIfInstanceIsNotPDX() {
-    InternalDistributedSystem ds = Fakes.distributedSystem();
-    CacheConfig cc = new CacheConfig();
-    TypeRegistry typeRegistry = mock(TypeRegistry.class);
-    GemFireCacheImpl gfc = GemFireCacheImpl.createWithAsyncEventListeners(ds, 
cc, typeRegistry);
-    try {
-      assertThatThrownBy(() -> gfc.registerPdxMetaData("string"))
-          .isInstanceOf(SerializationException.class)
-          .hasMessage("The instance is not PDX serializable");
-    } finally {
-      gfc.close();
-    }
+    gemFireCacheImpl = createGemFireCacheWithTypeRegistry();
+
+    assertThatThrownBy(() -> gemFireCacheImpl.registerPdxMetaData("string"))
+        .isInstanceOf(SerializationException.class)
+        .hasMessage("The instance is not PDX serializable");
   }
 
   @Test
   public void checkThatAsyncEventListenersUseAllThreadsInPool() {
-    InternalDistributedSystem ds = Fakes.distributedSystem();
-    CacheConfig cc = new CacheConfig();
-    TypeRegistry typeRegistry = mock(TypeRegistry.class);
-    GemFireCacheImpl gfc = GemFireCacheImpl.createWithAsyncEventListeners(ds, 
cc, typeRegistry);
-    try {
-      ThreadPoolExecutor executor = (ThreadPoolExecutor) 
gfc.getEventThreadPool();
-      assertEquals(0, executor.getCompletedTaskCount());
-      assertEquals(0, executor.getActiveCount());
-      int MAX_THREADS = GemFireCacheImpl.EVENT_THREAD_LIMIT;
-      final CountDownLatch cdl = new CountDownLatch(MAX_THREADS);
-      for (int i = 1; i <= MAX_THREADS; i++) {
-        executor.execute(() -> {
-          cdl.countDown();
-          try {
-            cdl.await();
-          } catch (InterruptedException e) {
-          }
-        });
-      }
-      await().timeout(90, TimeUnit.SECONDS)
-          .untilAsserted(() -> assertEquals(MAX_THREADS, 
executor.getCompletedTaskCount()));
-    } finally {
-      gfc.close();
+    gemFireCacheImpl = createGemFireCacheWithTypeRegistry();
+
+    ThreadPoolExecutor eventThreadPoolExecutor =
+        (ThreadPoolExecutor) gemFireCacheImpl.getEventThreadPool();
+    assertEquals(0, eventThreadPoolExecutor.getCompletedTaskCount());
+    assertEquals(0, eventThreadPoolExecutor.getActiveCount());
+
+    int MAX_THREADS = GemFireCacheImpl.EVENT_THREAD_LIMIT;
+    final CountDownLatch threadLatch = new CountDownLatch(MAX_THREADS);
+    for (int i = 1; i <= MAX_THREADS; i++) {
+      eventThreadPoolExecutor.execute(() -> {
+        threadLatch.countDown();
+        try {
+          threadLatch.await();
+        } catch (InterruptedException e) {
+        }
+      });
     }
+
+    await().untilAsserted(
+        () -> 
assertThat(eventThreadPoolExecutor.getCompletedTaskCount()).isEqualTo(MAX_THREADS));
   }
 
   @Test
   public void 
getCacheClosedExceptionWithNoReasonOrCauseGivesExceptionWithoutEither() {
-    cache = GemFireCacheImpl.create(distributedSystem, cacheConfig);
-    CacheClosedException e = cache.getCacheClosedException(null, null);
-    assertThat(e.getCause()).isNull();
-    assertThat(e.getMessage()).isNull();
+    gemFireCacheImpl = createGemFireCacheImpl();
+
+    CacheClosedException cacheClosedException =
+        gemFireCacheImpl.getCacheClosedException(null, null);
+
+    assertThat(cacheClosedException.getCause()).isNull();
+    assertThat(cacheClosedException.getMessage()).isNull();
   }
 
   @Test
   public void getCacheClosedExceptionWithNoCauseGivesExceptionWithReason() {
-    cache = GemFireCacheImpl.create(distributedSystem, cacheConfig);
-    CacheClosedException e = cache.getCacheClosedException("message", null);
-    assertThat(e.getCause()).isNull();
-    assertThat(e.getMessage()).isEqualTo("message");
+    gemFireCacheImpl = createGemFireCacheImpl();
+
+    CacheClosedException cacheClosedException = gemFireCacheImpl
+        .getCacheClosedException("message", null);
+
+    assertThat(cacheClosedException.getCause()).isNull();
+    assertThat(cacheClosedException.getMessage()).isEqualTo("message");
   }
 
   @Test
   public void 
getCacheClosedExceptionReturnsExceptionWithProvidedCauseAndReason() {
-    cache = GemFireCacheImpl.create(distributedSystem, cacheConfig);
+    gemFireCacheImpl = createGemFireCacheImpl();
     Throwable cause = new Throwable();
-    CacheClosedException e = cache.getCacheClosedException("message", cause);
-    assertThat(e.getCause()).isEqualTo(cause);
-    assertThat(e.getMessage()).isEqualTo("message");
+
+    CacheClosedException cacheClosedException = gemFireCacheImpl
+        .getCacheClosedException("message", cause);
+
+    assertThat(cacheClosedException.getCause()).isEqualTo(cause);
+    assertThat(cacheClosedException.getMessage()).isEqualTo("message");
   }
 
   @Test
   public void 
getCacheClosedExceptionWhenCauseGivenButDisconnectExceptionExistsPrefersCause() 
{
-    cache = GemFireCacheImpl.create(distributedSystem, cacheConfig);
-    cache.disconnectCause = new Throwable("disconnectCause");
+    gemFireCacheImpl = createGemFireCacheImpl();
+    gemFireCacheImpl.disconnectCause = new Throwable("disconnectCause");
     Throwable cause = new Throwable();
-    CacheClosedException e = cache.getCacheClosedException("message", cause);
-    assertThat(e.getCause()).isEqualTo(cause);
-    assertThat(e.getMessage()).isEqualTo("message");
+
+    CacheClosedException cacheClosedException = gemFireCacheImpl
+        .getCacheClosedException("message", cause);
+
+    assertThat(cacheClosedException.getCause()).isEqualTo(cause);
+    assertThat(cacheClosedException.getMessage()).isEqualTo("message");
   }
 
   @Test
   public void 
getCacheClosedExceptionWhenNoCauseGivenProvidesDisconnectExceptionIfExists() {
-    cache = GemFireCacheImpl.create(distributedSystem, cacheConfig);
+    gemFireCacheImpl = createGemFireCacheImpl();
     Throwable disconnectCause = new Throwable("disconnectCause");
-    cache.disconnectCause = disconnectCause;
-    CacheClosedException e = cache.getCacheClosedException("message", null);
-    assertThat(e.getCause()).isEqualTo(disconnectCause);
-    assertThat(e.getMessage()).isEqualTo("message");
+    gemFireCacheImpl.disconnectCause = disconnectCause;
+
+    CacheClosedException cacheClosedException = gemFireCacheImpl
+        .getCacheClosedException("message", null);
+
+    assertThat(cacheClosedException.getCause()).isEqualTo(disconnectCause);
+    assertThat(cacheClosedException.getMessage()).isEqualTo("message");
   }
 
   @Test
   public void getCacheClosedExceptionReturnsExceptionWithProvidedReason() {
-    cache = GemFireCacheImpl.create(distributedSystem, cacheConfig);
-    CacheClosedException e = cache.getCacheClosedException("message");
-    assertThat(e.getMessage()).isEqualTo("message");
-    assertThat(e.getCause()).isNull();
+    gemFireCacheImpl = createGemFireCacheImpl();
+
+    CacheClosedException cacheClosedException = 
gemFireCacheImpl.getCacheClosedException("message");
+
+    assertThat(cacheClosedException.getMessage()).isEqualTo("message");
+    assertThat(cacheClosedException.getCause()).isNull();
   }
 
   @Test
   public void 
getCacheClosedExceptionReturnsExceptionWithNoMessageWhenReasonNotGiven() {
-    cache = GemFireCacheImpl.create(distributedSystem, cacheConfig);
-    CacheClosedException e = cache.getCacheClosedException(null);
-    assertThat(e.getMessage()).isEqualTo(null);
-    assertThat(e.getCause()).isNull();
+    gemFireCacheImpl = createGemFireCacheImpl();
+
+    CacheClosedException cacheClosedException = 
gemFireCacheImpl.getCacheClosedException(null);
+
+    assertThat(cacheClosedException.getMessage()).isEqualTo(null);
+    assertThat(cacheClosedException.getCause()).isNull();
   }
 
   @Test
   public void getCacheClosedExceptionReturnsExceptionWithDisconnectCause() {
-    cache = GemFireCacheImpl.create(distributedSystem, cacheConfig);
+    gemFireCacheImpl = createGemFireCacheImpl();
     Throwable disconnectCause = new Throwable("disconnectCause");
-    cache.disconnectCause = disconnectCause;
-    CacheClosedException e = cache.getCacheClosedException("message");
-    assertThat(e.getMessage()).isEqualTo("message");
-    assertThat(e.getCause()).isEqualTo(disconnectCause);
+    gemFireCacheImpl.disconnectCause = disconnectCause;
+
+    CacheClosedException cacheClosedException = 
gemFireCacheImpl.getCacheClosedException("message");
+
+    assertThat(cacheClosedException.getMessage()).isEqualTo("message");
+    assertThat(cacheClosedException.getCause()).isEqualTo(disconnectCause);
   }
 
   @Test
   public void removeGatewayReceiverShouldRemoveFromReceiversList() {
     GatewayReceiver receiver = mock(GatewayReceiver.class);
-    cache = GemFireCacheImpl.create(distributedSystem, cacheConfig);
-    cache.addGatewayReceiver(receiver);
-    assertEquals(1, cache.getGatewayReceivers().size());
-    cache.removeGatewayReceiver(receiver);
-    assertEquals(0, cache.getGatewayReceivers().size());
+    gemFireCacheImpl = createGemFireCacheImpl();
+    gemFireCacheImpl.addGatewayReceiver(receiver);
+    assertEquals(1, gemFireCacheImpl.getGatewayReceivers().size());
+
+    gemFireCacheImpl.removeGatewayReceiver(receiver);
+
+    assertEquals(0, gemFireCacheImpl.getGatewayReceivers().size());
   }
 
 
   @Test
   public void removeFromCacheServerShouldRemoveFromCacheServersList() {
-    cache = GemFireCacheImpl.create(distributedSystem, cacheConfig);
-    CacheServer cacheServer = cache.addCacheServer(false);
-    assertEquals(1, cache.getCacheServers().size());
-    cache.removeCacheServer(cacheServer);
-    assertEquals(0, cache.getCacheServers().size());
-  }
+    gemFireCacheImpl = createGemFireCacheImpl();
+    CacheServer cacheServer = gemFireCacheImpl.addCacheServer(false);
+    assertEquals(1, gemFireCacheImpl.getCacheServers().size());
 
+    gemFireCacheImpl.removeCacheServer(cacheServer);
+
+    assertEquals(0, gemFireCacheImpl.getCacheServers().size());
+  }
 
   @Test
   public void testIsMisConfigured() {
@@ -326,18 +319,35 @@ public class GemFireCacheImplTest {
 
   @Test
   public void clientCacheWouldNotRequestClusterConfig() {
-    // we will need to set the value to true so that we can use a mock cache
+    // we will need to set the value to true so that we can use a mock 
gemFireCacheImpl
     boolean oldValue = InternalDistributedSystem.ALLOW_MULTIPLE_SYSTEMS;
     InternalDistributedSystem.ALLOW_MULTIPLE_SYSTEMS = true;
 
-    cache = mock(GemFireCacheImpl.class);
-    when(distributedSystem.getCache()).thenReturn(cache);
-    GemFireCacheImpl.createClient(distributedSystem, null, cacheConfig);
+    InternalDistributedSystem internalDistributedSystem = 
Fakes.distributedSystem();
+    gemFireCacheImpl = mock(GemFireCacheImpl.class);
+    when(internalDistributedSystem.getCache()).thenReturn(gemFireCacheImpl);
+
+    new InternalCacheBuilder()
+        .setIsClient(true)
+        .create(internalDistributedSystem);
 
-    verify(cache, times(0)).requestSharedConfiguration();
-    verify(cache, times(0)).applyJarAndXmlFromClusterConfig();
+    verify(gemFireCacheImpl, times(0)).requestSharedConfiguration();
+    verify(gemFireCacheImpl, times(0)).applyJarAndXmlFromClusterConfig();
 
     // reset it back to the old value
     InternalDistributedSystem.ALLOW_MULTIPLE_SYSTEMS = oldValue;
   }
+
+  private static GemFireCacheImpl createGemFireCacheImpl() {
+    return (GemFireCacheImpl) new 
InternalCacheBuilder().create(Fakes.distributedSystem());
+  }
+
+  private static GemFireCacheImpl createGemFireCacheWithTypeRegistry() {
+    InternalDistributedSystem internalDistributedSystem = 
Fakes.distributedSystem();
+    TypeRegistry typeRegistry = mock(TypeRegistry.class);
+    return (GemFireCacheImpl) new InternalCacheBuilder()
+        .setUseAsyncEventListeners(true)
+        .setTypeRegistry(typeRegistry)
+        .create(internalDistributedSystem);
+  }
 }
diff --git 
a/geode-core/src/test/java/org/apache/geode/internal/cache/InternalCacheBuilderAllowsMultipleSystemsTest.java
 
b/geode-core/src/test/java/org/apache/geode/internal/cache/InternalCacheBuilderAllowsMultipleSystemsTest.java
new file mode 100644
index 0000000..baba452
--- /dev/null
+++ 
b/geode-core/src/test/java/org/apache/geode/internal/cache/InternalCacheBuilderAllowsMultipleSystemsTest.java
@@ -0,0 +1,468 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache;
+
+import static 
org.apache.geode.internal.cache.InternalCacheBuilderAllowsMultipleSystemsTest.CacheState.CLOSED;
+import static 
org.apache.geode.internal.cache.InternalCacheBuilderAllowsMultipleSystemsTest.CacheState.OPEN;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.catchThrowable;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.ArgumentMatchers.same;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.Properties;
+import java.util.function.Supplier;
+
+import org.assertj.core.api.ThrowableAssert;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.geode.cache.CacheExistsException;
+import org.apache.geode.distributed.internal.DistributionConfig;
+import org.apache.geode.distributed.internal.InternalDistributedSystem;
+import 
org.apache.geode.internal.cache.InternalCacheBuilder.InternalCacheConstructor;
+import 
org.apache.geode.internal.cache.InternalCacheBuilder.InternalDistributedSystemConstructor;
+
+/**
+ * Unit tests for {@link InternalCacheBuilder} when
+ * {@code InternalDistributedSystem.ALLOW_MULTIPLE_SYSTEMS} is set to true.
+ */
+public class InternalCacheBuilderAllowsMultipleSystemsTest {
+
+  private static final int ANY_SYSTEM_ID = 12;
+  private static final String ANY_MEMBER_NAME = "a-member-name";
+
+  private static final Supplier<InternalDistributedSystem> 
THROWING_SYSTEM_SUPPLIER =
+      () -> {
+        throw new AssertionError("throwing system supplier");
+      };
+  private static final Supplier<InternalCache> THROWING_CACHE_SUPPLIER =
+      () -> {
+        throw new AssertionError("throwing cache supplier");
+      };
+
+  private static final InternalDistributedSystemConstructor 
THROWING_SYSTEM_CONSTRUCTOR =
+      (a, b) -> {
+        throw new AssertionError("throwing system constructor");
+      };
+  private static final InternalCacheConstructor THROWING_CACHE_CONSTRUCTOR =
+      (a, b, c, d, e, f) -> {
+        throw new AssertionError("throwing cache constructor");
+      };
+
+  @Before
+  public void setUp() {
+    InternalDistributedSystem.ALLOW_MULTIPLE_SYSTEMS = true;
+  }
+
+  @After
+  public void tearDown() {
+    InternalDistributedSystem.ALLOW_MULTIPLE_SYSTEMS = false;
+  }
+
+  @Test
+  public void create_throwsNullPointerException_ifConfigPropertiesIsNull() {
+    InternalCacheBuilder internalCacheBuilder = new InternalCacheBuilder(
+        null, new CacheConfig(),
+        THROWING_SYSTEM_SUPPLIER, constructorOf(constructedSystem()),
+        THROWING_CACHE_SUPPLIER, constructorOf(constructedCache()));
+
+    Throwable thrown = catchThrowable(() -> internalCacheBuilder
+        .create());
+
+    assertThat(thrown).isInstanceOf(NullPointerException.class);
+  }
+
+  @Test
+  public void create_throwsNullPointerException_andCacheConfigIsNull() {
+    InternalCacheBuilder internalCacheBuilder = new InternalCacheBuilder(
+        new Properties(), null,
+        THROWING_SYSTEM_SUPPLIER, constructorOf(constructedSystem()),
+        THROWING_CACHE_SUPPLIER, constructorOf(constructedCache()));
+
+    Throwable thrown = catchThrowable(() -> internalCacheBuilder
+        .create());
+
+    assertThat(thrown).isInstanceOf(NullPointerException.class);
+  }
+
+  @Test
+  public void create_constructsSystem_withGivenProperties_ifNoSystemExists() {
+    InternalCache constructedCache = constructedCache();
+
+    InternalDistributedSystemConstructor systemConstructor = 
constructorOf(constructedSystem());
+    Properties configProperties = new Properties();
+
+    InternalCacheBuilder internalCacheBuilder = new InternalCacheBuilder(
+        configProperties, new CacheConfig(),
+        THROWING_SYSTEM_SUPPLIER, systemConstructor,
+        THROWING_CACHE_SUPPLIER, constructorOf(constructedCache));
+
+    internalCacheBuilder.create();
+
+    verify(systemConstructor).construct(same(configProperties), any());
+  }
+
+  @Test
+  public void create_returnsConstructedCache_ifNoSystemExists() {
+    InternalCache constructedCache = constructedCache();
+
+    InternalCacheBuilder internalCacheBuilder = new InternalCacheBuilder(
+        new Properties(), new CacheConfig(),
+        THROWING_SYSTEM_SUPPLIER, constructorOf(constructedSystem()),
+        THROWING_CACHE_SUPPLIER, constructorOf(constructedCache));
+
+    InternalCache result = internalCacheBuilder.create();
+
+    assertThat(result).isSameAs(constructedCache);
+  }
+
+  @Test
+  public void 
create_setsConstructedCache_onConstructedSystem_ifNoSystemExists() {
+    InternalDistributedSystem constructedSystem = constructedSystem();
+    InternalCache constructedCache = constructedCache();
+
+    InternalCacheBuilder internalCacheBuilder = new InternalCacheBuilder(
+        new Properties(), new CacheConfig(),
+        THROWING_SYSTEM_SUPPLIER, constructorOf(constructedSystem),
+        THROWING_CACHE_SUPPLIER, constructorOf(constructedCache));
+
+    internalCacheBuilder.create();
+
+    verify(constructedSystem).setCache(same(constructedCache));
+  }
+
+  @Test
+  public void 
create_setsConstructedSystem_onConstructedCache_ifNoSystemExists() {
+    InternalDistributedSystem constructedSystem = constructedSystem();
+
+    InternalCacheConstructor cacheConstructor = 
constructorOf(constructedCache());
+
+    InternalCacheBuilder internalCacheBuilder = new InternalCacheBuilder(
+        new Properties(), new CacheConfig(),
+        THROWING_SYSTEM_SUPPLIER, constructorOf(constructedSystem),
+        THROWING_CACHE_SUPPLIER, cacheConstructor);
+
+    internalCacheBuilder.create();
+
+    verify(cacheConstructor).construct(anyBoolean(), any(), 
same(constructedSystem), any(),
+        anyBoolean(), any());
+  }
+
+
+  @Test
+  public void createWithSystem_throwsNullPointerException_ifSystemIsNull() {
+    InternalCacheBuilder internalCacheBuilder = new InternalCacheBuilder(
+        new Properties(), new CacheConfig(),
+        THROWING_SYSTEM_SUPPLIER, THROWING_SYSTEM_CONSTRUCTOR,
+        THROWING_CACHE_SUPPLIER, THROWING_CACHE_CONSTRUCTOR);
+
+    Throwable thrown = catchThrowable(() -> internalCacheBuilder.create(null));
+
+    assertThat(thrown).isInstanceOf(NullPointerException.class);
+  }
+
+  @Test
+  public void 
createWithSystem_returnsConstructedCache_ifSystemCacheDoesNotExist() {
+    InternalCache constructedCache = constructedCache();
+
+    InternalCacheBuilder internalCacheBuilder = new InternalCacheBuilder(
+        new Properties(), new CacheConfig(),
+        THROWING_SYSTEM_SUPPLIER, THROWING_SYSTEM_CONSTRUCTOR,
+        THROWING_CACHE_SUPPLIER, constructorOf(constructedCache));
+
+    InternalCache result = internalCacheBuilder
+        .create(givenSystem());
+
+    assertThat(result).isSameAs(constructedCache);
+  }
+
+  @Test
+  public void 
createWithSystem_setsConstructedCache_onGivenSystem_ifSystemCacheDoesNotExist() 
{
+    InternalDistributedSystem givenSystem = givenSystem();
+    InternalCache constructedCache = constructedCache();
+
+    InternalCacheBuilder internalCacheBuilder = new InternalCacheBuilder(
+        new Properties(), new CacheConfig(),
+        THROWING_SYSTEM_SUPPLIER, THROWING_SYSTEM_CONSTRUCTOR,
+        THROWING_CACHE_SUPPLIER, constructorOf(constructedCache));
+
+    internalCacheBuilder
+        .create(givenSystem);
+
+    verify(givenSystem).setCache(same(constructedCache));
+  }
+
+  @Test
+  public void 
createWithSystem_setsGivenSystem_onConstructedCache_ifSystemCacheDoesNotExist() 
{
+    InternalDistributedSystem givenSystem = givenSystem();
+
+    InternalCacheConstructor cacheConstructor = 
constructorOf(constructedCache());
+
+    InternalCacheBuilder internalCacheBuilder = new InternalCacheBuilder(
+        new Properties(), new CacheConfig(),
+        THROWING_SYSTEM_SUPPLIER, THROWING_SYSTEM_CONSTRUCTOR,
+        THROWING_CACHE_SUPPLIER, cacheConstructor);
+
+    internalCacheBuilder
+        .create(givenSystem);
+
+    verify(cacheConstructor).construct(anyBoolean(), any(), same(givenSystem), 
any(),
+        anyBoolean(), any());
+  }
+
+  @Test
+  public void createWithSystem_returnsConstructedCache_ifSystemCacheIsClosed() 
{
+    InternalDistributedSystem givenSystem = givenSystemWithCache(CLOSED);
+    InternalCache constructedCache = constructedCache();
+
+    InternalCacheBuilder internalCacheBuilder = new InternalCacheBuilder(
+        new Properties(), new CacheConfig(),
+        THROWING_SYSTEM_SUPPLIER, THROWING_SYSTEM_CONSTRUCTOR,
+        THROWING_CACHE_SUPPLIER, constructorOf(constructedCache));
+
+    InternalCache result = internalCacheBuilder
+        .create(givenSystem);
+
+    assertThat(result).isSameAs(constructedCache);
+  }
+
+  @Test
+  public void 
createWithSystem_setsConstructedCache_onGivenSystem_ifSystemCacheIsClosed() {
+    InternalDistributedSystem givenSystem = givenSystemWithCache(CLOSED);
+    InternalCache constructedCache = constructedCache();
+
+    InternalCacheBuilder internalCacheBuilder = new InternalCacheBuilder(
+        new Properties(), new CacheConfig(),
+        THROWING_SYSTEM_SUPPLIER, THROWING_SYSTEM_CONSTRUCTOR,
+        THROWING_CACHE_SUPPLIER, constructorOf(constructedCache));
+
+    internalCacheBuilder
+        .create(givenSystem);
+
+    verify(givenSystem).setCache(same(constructedCache));
+  }
+
+  @Test
+  public void 
createWithSystem_setsGivenSystem_onConstructedCache_ifSystemCacheIsClosed() {
+    InternalDistributedSystem givenSystem = givenSystemWithCache(CLOSED);
+
+    InternalCacheConstructor cacheConstructor = 
constructorOf(constructedCache());
+
+    InternalCacheBuilder internalCacheBuilder = new InternalCacheBuilder(
+        new Properties(), new CacheConfig(),
+        THROWING_SYSTEM_SUPPLIER, THROWING_SYSTEM_CONSTRUCTOR,
+        THROWING_CACHE_SUPPLIER, cacheConstructor);
+
+    internalCacheBuilder
+        .create(givenSystem);
+
+    verify(cacheConstructor).construct(anyBoolean(), any(), same(givenSystem), 
any(),
+        anyBoolean(), any());
+  }
+
+  @Test
+  public void 
createWithSystem_throwsCacheExistsException_ifSystemCacheIsOpen_butExistingNotOk()
 {
+    InternalDistributedSystem givenSystem = givenSystemWithCache(OPEN);
+
+    InternalCacheBuilder internalCacheBuilder = new InternalCacheBuilder(
+        new Properties(), new CacheConfig(),
+        THROWING_SYSTEM_SUPPLIER, THROWING_SYSTEM_CONSTRUCTOR,
+        THROWING_CACHE_SUPPLIER, THROWING_CACHE_CONSTRUCTOR);
+
+    Throwable thrown = catchThrowable(() -> internalCacheBuilder
+        .setIsExistingOk(false)
+        .create(givenSystem));
+
+    assertThat(thrown).isInstanceOf(CacheExistsException.class);
+  }
+
+  @Test
+  public void 
createWithSystem_doesNotSetSystemCache_onGivenSystem__ifSystemCacheIsOpen_butExistingNotOk()
 {
+    InternalDistributedSystem givenSystem = givenSystemWithCache(OPEN);
+
+    InternalCacheBuilder internalCacheBuilder = new InternalCacheBuilder(
+        new Properties(), new CacheConfig(),
+        THROWING_SYSTEM_SUPPLIER, THROWING_SYSTEM_CONSTRUCTOR,
+        THROWING_CACHE_SUPPLIER, THROWING_CACHE_CONSTRUCTOR);
+
+    ignoreThrowable(() -> internalCacheBuilder
+        .setIsExistingOk(false)
+        .create(givenSystem));
+
+    verify(givenSystem, never()).setCache(any());
+  }
+
+  @Test
+  public void 
createWithSystem_propagatesCacheConfigException_ifSystemCacheIsOpen_andExistingOk_butCacheIsIncompatible()
 {
+    InternalDistributedSystem givenSystem = givenSystemWithCache(OPEN);
+
+    Throwable thrownByCacheConfig = new IllegalStateException("incompatible");
+
+    InternalCacheBuilder internalCacheBuilder = new InternalCacheBuilder(
+        new Properties(), throwingCacheConfig(thrownByCacheConfig),
+        THROWING_SYSTEM_SUPPLIER, THROWING_SYSTEM_CONSTRUCTOR,
+        THROWING_CACHE_SUPPLIER, THROWING_CACHE_CONSTRUCTOR);
+
+    Throwable thrown = catchThrowable(() -> internalCacheBuilder
+        .setIsExistingOk(true)
+        .create(givenSystem));
+
+    assertThat(thrown).isSameAs(thrownByCacheConfig);
+  }
+
+  @Test
+  public void 
createWithSystem_doesNotSetSystemCache_onGivenSystem_ifSystemCacheIsOpen_andExistingOk_butCacheIsNotCompatible()
 {
+    InternalDistributedSystem givenSystem = givenSystemWithCache(OPEN);
+
+    InternalCacheBuilder internalCacheBuilder = new InternalCacheBuilder(
+        new Properties(), throwingCacheConfig(new 
IllegalStateException("incompatible")),
+        THROWING_SYSTEM_SUPPLIER, THROWING_SYSTEM_CONSTRUCTOR,
+        THROWING_CACHE_SUPPLIER, THROWING_CACHE_CONSTRUCTOR);
+
+    ignoreThrowable(() -> internalCacheBuilder
+        .setIsExistingOk(true)
+        .create(givenSystem));
+
+    verify(givenSystem, never()).setCache(any());
+  }
+
+  @Test
+  public void 
createWithSystem_returnsSystemCache_ifSystemCacheIsOpen_andExistingOk_andCacheIsCompatible()
 {
+    InternalDistributedSystem givenSystem = givenSystem();
+    InternalCache systemCache = systemCache(givenSystem, OPEN);
+
+    InternalCacheBuilder internalCacheBuilder = new InternalCacheBuilder(
+        new Properties(), new CacheConfig(),
+        THROWING_SYSTEM_SUPPLIER, THROWING_SYSTEM_CONSTRUCTOR,
+        THROWING_CACHE_SUPPLIER, THROWING_CACHE_CONSTRUCTOR);
+
+    InternalCache result = internalCacheBuilder
+        .setIsExistingOk(true)
+        .create(givenSystem);
+
+    assertThat(result).isSameAs(systemCache);
+  }
+
+  @Test
+  public void 
createWithSystem_setsSystemCache_onGivenSystem_ifSystemCacheIsOpen_andExistingOk_andCacheIsCompatible()
 {
+    InternalDistributedSystem givenSystem = givenSystem();
+    InternalCache systemCache = systemCache(givenSystem, OPEN);
+
+    InternalCacheBuilder internalCacheBuilder = new InternalCacheBuilder(
+        new Properties(), new CacheConfig(),
+        THROWING_SYSTEM_SUPPLIER, THROWING_SYSTEM_CONSTRUCTOR,
+        THROWING_CACHE_SUPPLIER, THROWING_CACHE_CONSTRUCTOR);
+
+    internalCacheBuilder
+        .setIsExistingOk(true)
+        .create(givenSystem);
+
+    verify(givenSystem).setCache(same(systemCache));
+  }
+
+  private InternalDistributedSystem constructedSystem() {
+    return systemWith("constructedSystem", ANY_SYSTEM_ID, ANY_MEMBER_NAME);
+  }
+
+  private InternalDistributedSystem givenSystem() {
+    return systemWith("givenSystem", ANY_SYSTEM_ID, ANY_MEMBER_NAME);
+  }
+
+  private InternalDistributedSystem systemWith(String mockName, int systemId, 
String memberName) {
+    InternalDistributedSystem system = mock(InternalDistributedSystem.class, 
mockName);
+    DistributionConfig distributionConfig = mock(DistributionConfig.class);
+    when(distributionConfig.getDistributedSystemId()).thenReturn(systemId);
+    when(system.getConfig()).thenReturn(distributionConfig);
+    when(system.getName()).thenReturn(memberName);
+    return system;
+  }
+
+  private InternalDistributedSystem givenSystemWithCache(CacheState state) {
+    InternalDistributedSystem system =
+        systemWith("givenSystemWithCache", ANY_SYSTEM_ID, ANY_MEMBER_NAME);
+    systemCache(system, state);
+    return system;
+  }
+
+  private static InternalCache constructedCache() {
+    return cache("constructedCache", OPEN);
+  }
+
+  private static InternalCache systemCache(InternalDistributedSystem 
givenSystem,
+      CacheState state) {
+    InternalCache cache = cache("systemCache", state);
+    when(givenSystem.getCache()).thenReturn(cache);
+    return cache;
+  }
+
+  private static InternalCache cache(String name, CacheState state) {
+    InternalCache cache = mock(InternalCache.class, name);
+    when(cache.isClosed()).thenReturn(state.isClosed());
+    doThrow(new CacheExistsException(cache, "cache exists"))
+        .when(cache).throwCacheExistsException();
+    return cache;
+  }
+
+  private static InternalDistributedSystemConstructor constructorOf(
+      InternalDistributedSystem constructedSystem) {
+    InternalDistributedSystemConstructor constructor =
+        mock(InternalDistributedSystemConstructor.class, "internal distributed 
system constructor");
+    when(constructor.construct(any(), any())).thenReturn(constructedSystem);
+    return constructor;
+  }
+
+  private static InternalCacheConstructor constructorOf(InternalCache 
constructedCache) {
+    InternalCacheConstructor constructor =
+        mock(InternalCacheConstructor.class, "internal cache constructor");
+    when(constructor.construct(anyBoolean(), any(), any(), any(), 
anyBoolean(), any()))
+        .thenReturn(constructedCache);
+    return constructor;
+  }
+
+  private static CacheConfig throwingCacheConfig(Throwable throwable) {
+    CacheConfig cacheConfig = mock(CacheConfig.class);
+    doThrow(throwable).when(cacheConfig).validateCacheConfig(any());
+    return cacheConfig;
+  }
+
+  private static void ignoreThrowable(ThrowableAssert.ThrowingCallable 
shouldRaiseThrowable) {
+    try {
+      shouldRaiseThrowable.call();
+    } catch (Throwable ignored) {
+    }
+  }
+
+  enum CacheState {
+    OPEN(false),
+    CLOSED(true);
+
+    private final boolean isClosed;
+
+    CacheState(boolean isClosed) {
+      this.isClosed = isClosed;
+    }
+
+    boolean isClosed() {
+      return isClosed;
+    }
+  }
+}
diff --git 
a/geode-core/src/test/java/org/apache/geode/internal/cache/InternalCacheBuilderTest.java
 
b/geode-core/src/test/java/org/apache/geode/internal/cache/InternalCacheBuilderTest.java
new file mode 100644
index 0000000..2814376
--- /dev/null
+++ 
b/geode-core/src/test/java/org/apache/geode/internal/cache/InternalCacheBuilderTest.java
@@ -0,0 +1,618 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache;
+
+import static 
org.apache.geode.internal.cache.InternalCacheBuilderTest.CacheState.CLOSED;
+import static 
org.apache.geode.internal.cache.InternalCacheBuilderTest.CacheState.OPEN;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.catchThrowable;
+import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.same;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyZeroInteractions;
+import static org.mockito.Mockito.when;
+import static org.mockito.MockitoAnnotations.initMocks;
+
+import java.util.Properties;
+import java.util.function.Supplier;
+
+import org.assertj.core.api.ThrowableAssert.ThrowingCallable;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mock;
+
+import org.apache.geode.cache.CacheExistsException;
+import org.apache.geode.distributed.internal.DistributionConfig;
+import org.apache.geode.distributed.internal.InternalDistributedSystem;
+import 
org.apache.geode.internal.cache.InternalCacheBuilder.InternalCacheConstructor;
+import 
org.apache.geode.internal.cache.InternalCacheBuilder.InternalDistributedSystemConstructor;
+
+/**
+ * Unit tests for {@link InternalCacheBuilder}.
+ */
+public class InternalCacheBuilderTest {
+
+  private static final int ANY_SYSTEM_ID = 12;
+  private static final String ANY_MEMBER_NAME = "a-member-name";
+
+  private static final Supplier<InternalDistributedSystem> 
THROWING_SYSTEM_SUPPLIER =
+      () -> {
+        throw new AssertionError("throwing system supplier");
+      };
+  private static final Supplier<InternalCache> THROWING_CACHE_SUPPLIER =
+      () -> {
+        throw new AssertionError("throwing cache supplier");
+      };
+
+  private static final InternalDistributedSystemConstructor 
THROWING_SYSTEM_CONSTRUCTOR =
+      (a, b) -> {
+        throw new AssertionError("throwing system constructor");
+      };
+  private static final InternalCacheConstructor THROWING_CACHE_CONSTRUCTOR =
+      (a, b, c, d, e, f) -> {
+        throw new AssertionError("throwing cache constructor");
+      };
+
+  @Mock
+  private Supplier<InternalDistributedSystem> nullSingletonSystemSupplier;
+
+  @Mock
+  private Supplier<InternalCache> nullSingletonCacheSupplier;
+
+  @Before
+  public void setUp() {
+    initMocks(this);
+
+    when(nullSingletonSystemSupplier.get()).thenReturn(null);
+    when(nullSingletonCacheSupplier.get()).thenReturn(null);
+
+    InternalDistributedSystem.ALLOW_MULTIPLE_SYSTEMS = false;
+  }
+
+  @Test
+  public void create_throwsNullPointerException_ifConfigPropertiesIsNull() {
+    InternalCacheBuilder internalCacheBuilder = new InternalCacheBuilder(
+        null, new CacheConfig(),
+        nullSingletonSystemSupplier, constructorOf(constructedSystem()),
+        nullSingletonCacheSupplier, constructorOf(constructedCache()));
+
+    Throwable thrown = catchThrowable(() -> internalCacheBuilder
+        .create());
+
+    assertThat(thrown).isInstanceOf(NullPointerException.class);
+  }
+
+  @Test
+  public void create_throwsNullPointerException_andCacheConfigIsNull() {
+    InternalCacheBuilder internalCacheBuilder = new InternalCacheBuilder(
+        new Properties(), null,
+        nullSingletonSystemSupplier, constructorOf(constructedSystem()),
+        nullSingletonCacheSupplier, constructorOf(constructedCache()));
+
+    Throwable thrown = catchThrowable(() -> internalCacheBuilder
+        .create());
+
+    assertThat(thrown).isInstanceOf(NullPointerException.class);
+  }
+
+  @Test
+  public void 
create_constructsSystem_withGivenProperties_ifNoSystemExists_andNoCacheExists() 
{
+    InternalCache constructedCache = constructedCache();
+
+    InternalDistributedSystemConstructor systemConstructor = 
constructorOf(constructedSystem());
+    Properties configProperties = new Properties();
+
+    InternalCacheBuilder internalCacheBuilder = new InternalCacheBuilder(
+        configProperties, new CacheConfig(),
+        nullSingletonSystemSupplier, systemConstructor,
+        nullSingletonCacheSupplier, constructorOf(constructedCache));
+
+    internalCacheBuilder
+        .create();
+
+    verify(systemConstructor).construct(same(configProperties), any());
+  }
+
+  @Test
+  public void create_returnsConstructedCache_ifNoSystemExists() {
+    InternalCache constructedCache = constructedCache();
+
+    InternalCacheBuilder internalCacheBuilder = new InternalCacheBuilder(
+        new Properties(), new CacheConfig(),
+        nullSingletonSystemSupplier, constructorOf(constructedSystem()),
+        nullSingletonCacheSupplier, constructorOf(constructedCache));
+
+    InternalCache result = internalCacheBuilder
+        .create();
+
+    assertThat(result).isSameAs(constructedCache);
+  }
+
+  @Test
+  public void 
create_setsConstructedCache_onConstructedSystem_ifNoSystemExists() {
+    InternalDistributedSystem constructedSystem = constructedSystem();
+    InternalCache constructedCache = constructedCache();
+
+    InternalCacheBuilder internalCacheBuilder = new InternalCacheBuilder(
+        new Properties(), new CacheConfig(),
+        nullSingletonSystemSupplier, constructorOf(constructedSystem),
+        nullSingletonCacheSupplier, constructorOf(constructedCache));
+
+    internalCacheBuilder
+        .create();
+
+    verify(constructedSystem).setCache(same(constructedCache));
+  }
+
+  @Test
+  public void 
create_setsConstructedSystem_onConstructedCache_ifNoSystemExists_() {
+    InternalDistributedSystem constructedSystem = constructedSystem();
+    InternalCache constructedCache = constructedCache();
+
+    InternalCacheConstructor cacheConstructor = 
constructorOf(constructedCache);
+
+    InternalCacheBuilder internalCacheBuilder = new InternalCacheBuilder(
+        new Properties(), new CacheConfig(),
+        nullSingletonSystemSupplier, constructorOf(constructedSystem),
+        nullSingletonCacheSupplier, cacheConstructor);
+
+    internalCacheBuilder
+        .create();
+
+    verify(cacheConstructor).construct(anyBoolean(), any(), 
same(constructedSystem), any(),
+        anyBoolean(), any());
+  }
+
+  @Test
+  public void 
create_returnsConstructedCache_ifSingletonSystemExists_andNoCacheExists() {
+    InternalCache constructedCache = constructedCache();
+
+    InternalCacheBuilder internalCacheBuilder = new InternalCacheBuilder(
+        new Properties(), new CacheConfig(),
+        supplierOf(singletonSystem()), THROWING_SYSTEM_CONSTRUCTOR,
+        nullSingletonCacheSupplier, constructorOf(constructedCache));
+
+    InternalCache result = internalCacheBuilder
+        .create();
+
+    assertThat(result).isSameAs(constructedCache);
+  }
+
+  @Test
+  public void 
create_setsSingletonSystem_onConstructedCache_ifSingletonSystemExists_andNoCacheExists()
 {
+    InternalDistributedSystem singletonSystem = singletonSystem();
+    InternalCache constructedCache = constructedCache();
+
+    InternalCacheConstructor cacheConstructor = 
constructorOf(constructedCache);
+
+    InternalCacheBuilder internalCacheBuilder = new InternalCacheBuilder(
+        new Properties(), new CacheConfig(),
+        supplierOf(singletonSystem), THROWING_SYSTEM_CONSTRUCTOR,
+        nullSingletonCacheSupplier, cacheConstructor);
+
+    internalCacheBuilder
+        .create();
+
+    verify(singletonSystem).setCache(same(constructedCache));
+  }
+
+  @Test
+  public void 
create_setsConstructedCache_onSingletonSystem_ifSingletonSystemExists_andNoCacheExists()
 {
+    InternalDistributedSystem singletonSystem = singletonSystem();
+    InternalCache constructedCache = constructedCache();
+
+    InternalCacheConstructor cacheConstructor = 
constructorOf(constructedCache);
+
+    InternalCacheBuilder internalCacheBuilder = new InternalCacheBuilder(
+        new Properties(), new CacheConfig(),
+        supplierOf(singletonSystem), THROWING_SYSTEM_CONSTRUCTOR,
+        nullSingletonCacheSupplier, cacheConstructor);
+
+    internalCacheBuilder
+        .create();
+
+    verify(cacheConstructor).construct(anyBoolean(), any(), 
same(singletonSystem), any(),
+        anyBoolean(), any());
+  }
+
+  @Test
+  public void 
create_returnsConstructedCache_ifSingletonSystemExists_andSingletonCacheIsClosed()
 {
+    InternalCache constructedCache = constructedCache();
+
+    InternalCacheBuilder internalCacheBuilder = new InternalCacheBuilder(
+        new Properties(), new CacheConfig(),
+        supplierOf(singletonSystem()), THROWING_SYSTEM_CONSTRUCTOR,
+        supplierOf(singletonCache(CLOSED)), constructorOf(constructedCache));
+
+    InternalCache result = internalCacheBuilder
+        .create();
+
+    assertThat(result).isSameAs(constructedCache);
+  }
+
+  @Test
+  public void 
create_setsConstructedCache_onSingletonSystem_ifSingletonSystemExists_andSingletonCacheIsClosed()
 {
+    InternalDistributedSystem singletonSystem = singletonSystem();
+    InternalCache constructedCache = constructedCache();
+
+    InternalCacheBuilder internalCacheBuilder = new InternalCacheBuilder(
+        new Properties(), new CacheConfig(),
+        supplierOf(singletonSystem), THROWING_SYSTEM_CONSTRUCTOR,
+        supplierOf(singletonCache(CLOSED)), constructorOf(constructedCache));
+
+    internalCacheBuilder
+        .create();
+
+    verify(singletonSystem).setCache(same(constructedCache));
+  }
+
+  @Test
+  public void 
create_setsSingletonSystem_onConstructedCache_ifSingletonSystemExists_andSingletonCacheIsClosed()
 {
+    InternalDistributedSystem singletonSystem = singletonSystem();
+
+    InternalCacheConstructor cacheConstructor = 
constructorOf(constructedCache());
+
+    InternalCacheBuilder internalCacheBuilder = new InternalCacheBuilder(
+        new Properties(), new CacheConfig(),
+        supplierOf(singletonSystem), THROWING_SYSTEM_CONSTRUCTOR,
+        supplierOf(singletonCache(CLOSED)), cacheConstructor);
+
+    internalCacheBuilder
+        .create();
+
+    verify(cacheConstructor).construct(anyBoolean(), any(), 
same(singletonSystem), any(),
+        anyBoolean(), any());
+  }
+
+  @Test
+  public void 
create_throwsCacheExistsException_ifSingletonSystemExists_andSingletonCacheIsOpen_butExistingIsNotOk()
 {
+    InternalCacheBuilder internalCacheBuilder = new InternalCacheBuilder(
+        new Properties(), new CacheConfig(),
+        supplierOf(singletonSystem()), THROWING_SYSTEM_CONSTRUCTOR,
+        supplierOf(singletonCache(OPEN)), THROWING_CACHE_CONSTRUCTOR);
+
+    Throwable thrown = catchThrowable(() -> internalCacheBuilder
+        .setIsExistingOk(false)
+        .create());
+
+    assertThat(thrown).isInstanceOf(CacheExistsException.class);
+  }
+
+  @Test
+  public void 
create_propagatesCacheConfigException_ifSingletonSystemExists_andSingletonCacheIsOpen_andExistingIsOk_butCacheIsIncompatible()
 {
+    Throwable thrownByCacheConfig = new IllegalStateException("incompatible");
+
+    InternalCacheBuilder internalCacheBuilder = new InternalCacheBuilder(
+        new Properties(), throwingCacheConfig(thrownByCacheConfig),
+        supplierOf(singletonSystem()), THROWING_SYSTEM_CONSTRUCTOR,
+        supplierOf(singletonCache(OPEN)), THROWING_CACHE_CONSTRUCTOR);
+
+    Throwable thrown = catchThrowable(() -> internalCacheBuilder
+        .setIsExistingOk(true)
+        .create());
+
+    assertThat(thrown).isSameAs(thrownByCacheConfig);
+  }
+
+  @Test
+  public void create_returnsSingletonCache_ifSingletonCacheIsOpen() {
+    InternalCache singletonCache = singletonCache(OPEN);
+
+    InternalCacheBuilder internalCacheBuilder = new InternalCacheBuilder(
+        new Properties(), new CacheConfig(),
+        supplierOf(singletonSystem()), THROWING_SYSTEM_CONSTRUCTOR,
+        supplierOf(singletonCache), THROWING_CACHE_CONSTRUCTOR);
+
+    InternalCache result = internalCacheBuilder
+        .create();
+
+    assertThat(result).isSameAs(singletonCache);
+  }
+
+  @Test
+  public void createWithSystem_throwsNullPointerException_ifSystemIsNull() {
+    InternalCacheBuilder internalCacheBuilder = new InternalCacheBuilder(
+        new Properties(), new CacheConfig(),
+        THROWING_SYSTEM_SUPPLIER, THROWING_SYSTEM_CONSTRUCTOR,
+        THROWING_CACHE_SUPPLIER, THROWING_CACHE_CONSTRUCTOR);
+
+    Throwable thrown = catchThrowable(() -> internalCacheBuilder
+        .create(null));
+
+    assertThat(thrown).isInstanceOf(NullPointerException.class);
+  }
+
+  @Test
+  public void createWithSystem_returnsConstructedCache_ifNoCacheExists() {
+    InternalCache constructedCache = constructedCache();
+
+    InternalCacheBuilder internalCacheBuilder = new InternalCacheBuilder(
+        new Properties(), new CacheConfig(),
+        THROWING_SYSTEM_SUPPLIER, THROWING_SYSTEM_CONSTRUCTOR,
+        nullSingletonCacheSupplier, constructorOf(constructedCache));
+
+    InternalCache result = internalCacheBuilder
+        .create(givenSystem());
+
+    assertThat(result).isSameAs(constructedCache);
+  }
+
+  @Test
+  public void 
createWithSystem_setsConstructedCache_onGivenSystem_ifNoCacheExists() {
+    InternalDistributedSystem givenSystem = givenSystem();
+    InternalCache constructedCache = constructedCache();
+
+    InternalCacheBuilder internalCacheBuilder = new InternalCacheBuilder(
+        new Properties(), new CacheConfig(),
+        THROWING_SYSTEM_SUPPLIER, THROWING_SYSTEM_CONSTRUCTOR,
+        nullSingletonCacheSupplier, constructorOf(constructedCache));
+
+    internalCacheBuilder
+        .create(givenSystem);
+
+    verify(givenSystem).setCache(same(constructedCache));
+  }
+
+  @Test
+  public void 
createWithSystem_setsGivenSystem_onConstructedCache_ifNoCacheExists() {
+    InternalDistributedSystem givenSystem = givenSystem();
+
+    InternalCacheConstructor cacheConstructor = 
constructorOf(constructedCache());
+
+    InternalCacheBuilder internalCacheBuilder = new InternalCacheBuilder(
+        new Properties(), new CacheConfig(),
+        THROWING_SYSTEM_SUPPLIER, THROWING_SYSTEM_CONSTRUCTOR,
+        nullSingletonCacheSupplier, cacheConstructor);
+
+    internalCacheBuilder
+        .create(givenSystem);
+
+    verify(cacheConstructor).construct(anyBoolean(), any(), same(givenSystem), 
any(),
+        anyBoolean(), any());
+  }
+
+  @Test
+  public void 
createWithSystem_returnsConstructedCache_ifSingletonCacheIsClosed() {
+    InternalCache constructedCache = constructedCache();
+
+    InternalCacheBuilder internalCacheBuilder = new InternalCacheBuilder(
+        new Properties(), new CacheConfig(),
+        THROWING_SYSTEM_SUPPLIER, THROWING_SYSTEM_CONSTRUCTOR,
+        supplierOf(singletonCache(CLOSED)), constructorOf(constructedCache));
+
+    InternalCache result = internalCacheBuilder
+        .create(givenSystem());
+
+    assertThat(result).isSameAs(constructedCache);
+  }
+
+  @Test
+  public void 
createWithSystem_setsConstructedCache_onGivenSystem_ifSingletonCacheIsClosed() {
+    InternalDistributedSystem givenSystem = givenSystem();
+    InternalCache constructedCache = constructedCache();
+
+    InternalCacheBuilder internalCacheBuilder = new InternalCacheBuilder(
+        new Properties(), new CacheConfig(),
+        THROWING_SYSTEM_SUPPLIER, THROWING_SYSTEM_CONSTRUCTOR,
+        supplierOf(singletonCache(CLOSED)), constructorOf(constructedCache));
+
+    internalCacheBuilder
+        .create(givenSystem);
+
+    verify(givenSystem).setCache(same(constructedCache));
+  }
+
+  @Test
+  public void 
createWithSystem_setsGivenSystem_onConstructedCache_ifSingletonCacheIsClosed() {
+    InternalDistributedSystem givenSystem = givenSystem();
+
+    InternalCacheConstructor cacheConstructor = 
constructorOf(constructedCache());
+
+    InternalCacheBuilder internalCacheBuilder = new InternalCacheBuilder(
+        new Properties(), new CacheConfig(),
+        THROWING_SYSTEM_SUPPLIER, THROWING_SYSTEM_CONSTRUCTOR,
+        supplierOf(singletonCache(CLOSED)), cacheConstructor);
+
+    internalCacheBuilder
+        .create(givenSystem);
+
+    verify(cacheConstructor).construct(anyBoolean(), any(), same(givenSystem), 
any(),
+        anyBoolean(), any());
+  }
+
+  @Test
+  public void 
createWithSystem_throwsCacheExistsException_ifSingletonCacheIsOpen_butExistingIsNotOk()
 {
+    InternalCacheBuilder internalCacheBuilder = new InternalCacheBuilder(
+        new Properties(), new CacheConfig(),
+        THROWING_SYSTEM_SUPPLIER, THROWING_SYSTEM_CONSTRUCTOR,
+        supplierOf(singletonCache(OPEN)), THROWING_CACHE_CONSTRUCTOR);
+
+    Throwable thrown = catchThrowable(() -> internalCacheBuilder
+        .setIsExistingOk(false)
+        .create(givenSystem()));
+
+    assertThat(thrown).isInstanceOf(CacheExistsException.class);
+  }
+
+  @Test
+  public void 
createWithSystem_doesNotSetSingletonCache_onGivenSystem_ifSingletonCacheIsOpen_butExistingIsNotOk()
 {
+    InternalDistributedSystem givenSystem = givenSystem();
+
+    InternalCacheBuilder internalCacheBuilder = new InternalCacheBuilder(
+        new Properties(), new CacheConfig(),
+        THROWING_SYSTEM_SUPPLIER, THROWING_SYSTEM_CONSTRUCTOR,
+        supplierOf(singletonCache(OPEN)), THROWING_CACHE_CONSTRUCTOR);
+
+    ignoreThrowable(() -> internalCacheBuilder
+        .setIsExistingOk(false)
+        .create(givenSystem));
+
+    verify(givenSystem, never()).setCache(any());
+  }
+
+  @Test
+  public void 
createWithSystem_propagatesCacheConfigException_ifSingletonCacheIsOpen_andExistingIsOk_butCacheIsIncompatible()
 {
+    Throwable thrownByCacheConfig = new IllegalStateException("incompatible");
+
+    InternalCacheBuilder internalCacheBuilder = new InternalCacheBuilder(
+        new Properties(), throwingCacheConfig(thrownByCacheConfig),
+        THROWING_SYSTEM_SUPPLIER, THROWING_SYSTEM_CONSTRUCTOR,
+        supplierOf(singletonCache(OPEN)), THROWING_CACHE_CONSTRUCTOR);
+
+    Throwable thrown = catchThrowable(() -> internalCacheBuilder
+        .setIsExistingOk(true)
+        .create(givenSystem()));
+
+    assertThat(thrown).isSameAs(thrownByCacheConfig);
+  }
+
+  @Test
+  public void 
createWithSystem_doesNotSetSingletonCache_onGivenSystem_ifSingletonCacheIsOpen_andExistingIsOk_butCacheIsNotCompatible()
 {
+    InternalDistributedSystem givenSystem = givenSystem();
+
+    InternalCacheBuilder internalCacheBuilder = new InternalCacheBuilder(
+        new Properties(), throwingCacheConfig(new 
IllegalStateException("incompatible")),
+        THROWING_SYSTEM_SUPPLIER, THROWING_SYSTEM_CONSTRUCTOR,
+        supplierOf(singletonCache(OPEN)), THROWING_CACHE_CONSTRUCTOR);
+
+    ignoreThrowable(() -> internalCacheBuilder
+        .setIsExistingOk(true)
+        .create(givenSystem));
+
+    verifyZeroInteractions(givenSystem);
+  }
+
+  @Test
+  public void 
createWithSystem_returnsSingletonCache_ifSingletonCacheIsOpen_andExistingIsOk_andCacheIsCompatible()
 {
+    InternalCache singletonCache = singletonCache(OPEN);
+
+    InternalCacheBuilder internalCacheBuilder = new InternalCacheBuilder(
+        new Properties(), new CacheConfig(),
+        THROWING_SYSTEM_SUPPLIER, THROWING_SYSTEM_CONSTRUCTOR,
+        supplierOf(singletonCache), THROWING_CACHE_CONSTRUCTOR);
+
+    InternalCache result = internalCacheBuilder
+        .setIsExistingOk(true)
+        .create(givenSystem());
+
+    assertThat(result).isSameAs(singletonCache);
+  }
+
+  @Test
+  public void 
createWithSystem_setsSingletonCache_onGivenSystem_ifSingletonCacheIsOpen_andExistingIsOk_andCacheIsCompatible()
 {
+    InternalDistributedSystem givenSystem = givenSystem();
+    InternalCache singletonCache = singletonCache(OPEN);
+
+    InternalCacheBuilder internalCacheBuilder = new InternalCacheBuilder(
+        new Properties(), new CacheConfig(),
+        THROWING_SYSTEM_SUPPLIER, THROWING_SYSTEM_CONSTRUCTOR,
+        supplierOf(singletonCache), THROWING_CACHE_CONSTRUCTOR);
+
+    internalCacheBuilder
+        .setIsExistingOk(true)
+        .create(givenSystem);
+
+    verify(givenSystem).setCache(same(singletonCache));
+  }
+
+  private InternalDistributedSystem constructedSystem() {
+    return systemWith("constructedSystem", ANY_SYSTEM_ID, ANY_MEMBER_NAME);
+  }
+
+  private InternalDistributedSystem givenSystem() {
+    return systemWith("givenSystem", ANY_SYSTEM_ID, ANY_MEMBER_NAME);
+  }
+
+  private InternalDistributedSystem singletonSystem() {
+    return systemWith("singletonSystem", ANY_SYSTEM_ID, ANY_MEMBER_NAME);
+  }
+
+  private InternalDistributedSystem systemWith(String mockName, int systemId, 
String memberName) {
+    InternalDistributedSystem system = mock(InternalDistributedSystem.class, 
mockName);
+    DistributionConfig distributionConfig = mock(DistributionConfig.class);
+    when(distributionConfig.getDistributedSystemId()).thenReturn(systemId);
+    when(system.getConfig()).thenReturn(distributionConfig);
+    when(system.getName()).thenReturn(memberName);
+    return system;
+  }
+
+  private static InternalCache constructedCache() {
+    return cache("constructedCache", OPEN);
+  }
+
+  private static InternalCache singletonCache(CacheState state) {
+    return cache("singletonCache", state);
+  }
+
+  private static InternalCache cache(String name, CacheState state) {
+    InternalCache cache = mock(InternalCache.class, name);
+    when(cache.isClosed()).thenReturn(state.isClosed());
+    doThrow(new CacheExistsException(cache, "cache exists"))
+        .when(cache).throwCacheExistsException();
+    return cache;
+  }
+
+  private static InternalDistributedSystemConstructor constructorOf(
+      InternalDistributedSystem constructedSystem) {
+    InternalDistributedSystemConstructor constructor =
+        mock(InternalDistributedSystemConstructor.class, "internal distributed 
system constructor");
+    when(constructor.construct(any(), any())).thenReturn(constructedSystem);
+    return constructor;
+  }
+
+  private static InternalCacheConstructor constructorOf(InternalCache 
constructedCache) {
+    InternalCacheConstructor constructor =
+        mock(InternalCacheConstructor.class, "internal cache constructor");
+    when(constructor.construct(anyBoolean(), any(), any(), any(), 
anyBoolean(), any()))
+        .thenReturn(constructedCache);
+    return constructor;
+  }
+
+  private static <T> Supplier<T> supplierOf(T instance) {
+    return () -> instance;
+  }
+
+  private static CacheConfig throwingCacheConfig(Throwable throwable) {
+    CacheConfig cacheConfig = mock(CacheConfig.class);
+    doThrow(throwable).when(cacheConfig).validateCacheConfig(any());
+    return cacheConfig;
+  }
+
+  private static void ignoreThrowable(ThrowingCallable shouldRaiseThrowable) {
+    try {
+      shouldRaiseThrowable.call();
+    } catch (Throwable ignored) {
+    }
+  }
+
+  enum CacheState {
+    OPEN(false),
+    CLOSED(true);
+
+    private final boolean isClosed;
+
+    CacheState(boolean isClosed) {
+      this.isClosed = isClosed;
+    }
+
+    boolean isClosed() {
+      return isClosed;
+    }
+  }
+}
diff --git 
a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/WANTestBase.java
 
b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/WANTestBase.java
index 1512441..d365325 100644
--- 
a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/WANTestBase.java
+++ 
b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/WANTestBase.java
@@ -132,11 +132,11 @@ import org.apache.geode.internal.AvailablePort;
 import org.apache.geode.internal.AvailablePortHelper;
 import org.apache.geode.internal.admin.remote.DistributionLocatorId;
 import org.apache.geode.internal.cache.BucketRegion;
-import org.apache.geode.internal.cache.CacheConfig;
 import org.apache.geode.internal.cache.CacheServerImpl;
 import org.apache.geode.internal.cache.CustomerIDPartitionResolver;
 import org.apache.geode.internal.cache.ForceReattemptException;
 import org.apache.geode.internal.cache.GemFireCacheImpl;
+import org.apache.geode.internal.cache.InternalCacheBuilder;
 import org.apache.geode.internal.cache.InternalRegion;
 import org.apache.geode.internal.cache.PartitionedRegion;
 import org.apache.geode.internal.cache.RegionQueue;
@@ -983,10 +983,12 @@ public class WANTestBase extends DistributedTestCase {
     props.setProperty(MCAST_PORT, "0");
     props.setProperty(LOCATORS, "localhost[" + locPort + "]");
     InternalDistributedSystem ds = test.getSystem(props);
-    CacheConfig cacheConfig = new CacheConfig();
-    cacheConfig.setPdxPersistent(true);
-    cacheConfig.setPdxDiskStore("PDX_TEST");
-    cache = GemFireCacheImpl.create(ds, false, cacheConfig);
+
+    cache = new InternalCacheBuilder(props)
+        .setPdxPersistent(true)
+        .setPdxDiskStore("PDX_TEST")
+        .setIsExistingOk(false)
+        .create(ds);
 
     File pdxDir = new File(CacheTestCase.getDiskDir(), "pdx");
     DiskStoreFactory dsf = cache.createDiskStoreFactory();
@@ -2169,11 +2171,14 @@ public class WANTestBase extends DistributedTestCase {
     props.setProperty(MCAST_PORT, "0");
     props.setProperty(LOCATORS, "localhost[" + locPort + "]");
     InternalDistributedSystem ds = test.getSystem(props);
-    CacheConfig cacheConfig = new CacheConfig();
     File pdxDir = new File(CacheTestCase.getDiskDir(), "pdx");
-    cacheConfig.setPdxPersistent(true);
-    cacheConfig.setPdxDiskStore("pdxStore");
-    cache = GemFireCacheImpl.create(ds, false, cacheConfig);
+
+    cache = new InternalCacheBuilder(props)
+        .setPdxPersistent(true)
+        .setPdxDiskStore("pdxStore")
+        .setIsExistingOk(false)
+        .create(ds);
+
     cache.createDiskStoreFactory().setDiskDirs(new File[] 
{pdxDir}).setMaxOplogSize(1)
         .create("pdxStore");
     GatewayReceiverFactory fact = cache.createGatewayReceiverFactory();

Reply via email to