Repository: atlas Updated Branches: refs/heads/master bf5f8ef05 -> 15534f235
ATLAS-2470 - JanusGraph Cassandra . Updates to unit test. Project: http://git-wip-us.apache.org/repos/asf/atlas/repo Commit: http://git-wip-us.apache.org/repos/asf/atlas/commit/15534f23 Tree: http://git-wip-us.apache.org/repos/asf/atlas/tree/15534f23 Diff: http://git-wip-us.apache.org/repos/asf/atlas/diff/15534f23 Branch: refs/heads/master Commit: 15534f235dea3cc5e993b461b8ee72f3efabee55 Parents: bf5f8ef Author: Ashutosh Mestry <[email protected]> Authored: Wed Apr 11 17:01:59 2018 -0700 Committer: Ashutosh Mestry <[email protected]> Committed: Wed Apr 11 17:01:59 2018 -0700 ---------------------------------------------------------------------- .../audit/CassandraAuditRepositoryTest.java | 67 +++++++++++++++----- 1 file changed, 50 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/atlas/blob/15534f23/repository/src/test/java/org/apache/atlas/repository/audit/CassandraAuditRepositoryTest.java ---------------------------------------------------------------------- diff --git a/repository/src/test/java/org/apache/atlas/repository/audit/CassandraAuditRepositoryTest.java b/repository/src/test/java/org/apache/atlas/repository/audit/CassandraAuditRepositoryTest.java index 20118a1..4135687 100644 --- a/repository/src/test/java/org/apache/atlas/repository/audit/CassandraAuditRepositoryTest.java +++ b/repository/src/test/java/org/apache/atlas/repository/audit/CassandraAuditRepositoryTest.java @@ -18,13 +18,15 @@ package org.apache.atlas.repository.audit; -import org.apache.atlas.ApplicationProperties; +import com.datastax.driver.core.Cluster; +import com.datastax.driver.core.Session; import org.apache.atlas.AtlasException; import org.apache.cassandra.exceptions.ConfigurationException; import org.apache.commons.configuration.Configuration; import org.apache.commons.configuration.MapConfiguration; import org.apache.thrift.transport.TTransportException; import org.cassandraunit.utils.EmbeddedCassandraServerHelper; +import org.testng.SkipException; import org.testng.annotations.BeforeClass; import java.io.IOException; @@ -32,22 +34,53 @@ import java.util.HashMap; import java.util.Map; public class CassandraAuditRepositoryTest extends AuditRepositoryTestBase { + private static final int MAX_RETRIES = 9; + private final String CLUSTER_HOST = "localhost"; + private final String CLUSTER_NAME_TEST = "Test Cluster"; + private final int CLUSTER_PORT = 9042; - @BeforeClass - public void setup() throws InterruptedException, TTransportException, ConfigurationException, IOException, - AtlasException { - EmbeddedCassandraServerHelper.startEmbeddedCassandra("cassandra_test.yml"); - eventRepository = new CassandraBasedAuditRepository(); - Map<String, Object> props = new HashMap<>(); - props.put(CassandraBasedAuditRepository.MANAGE_EMBEDDED_CASSANDRA, Boolean.TRUE); - props.put(CassandraBasedAuditRepository.CASSANDRA_CLUSTERNAME_PROPERTY, "Test Cluster"); - props.put(CassandraBasedAuditRepository.CASSANDRA_HOSTNAME_PROPERTY, "localhost"); - props.put(CassandraBasedAuditRepository.CASSANDRA_PORT_PROPERTY, 9042); - Configuration atlasConf = new MapConfiguration(props); - ((CassandraBasedAuditRepository)eventRepository).setApplicationProperties(atlasConf); - ((CassandraBasedAuditRepository)eventRepository).start(); - // Pause for a second to ensure that the embedded cluster has started - Thread.sleep(1000); - } + @BeforeClass + public void setup() throws InterruptedException, TTransportException, ConfigurationException, IOException, + AtlasException { + EmbeddedCassandraServerHelper.startEmbeddedCassandra("cassandra_test.yml"); + eventRepository = new CassandraBasedAuditRepository(); + Configuration atlasConf = new MapConfiguration(getClusterProperties()); + ((CassandraBasedAuditRepository) eventRepository).setApplicationProperties(atlasConf); + ((CassandraBasedAuditRepository) eventRepository).start(); + ensureClusterCreation(); + } + + private Map<String, Object> getClusterProperties() { + Map<String, Object> props = new HashMap<>(); + props.put(CassandraBasedAuditRepository.MANAGE_EMBEDDED_CASSANDRA, Boolean.TRUE); + props.put(CassandraBasedAuditRepository.CASSANDRA_CLUSTERNAME_PROPERTY, CLUSTER_NAME_TEST); + props.put(CassandraBasedAuditRepository.CASSANDRA_HOSTNAME_PROPERTY, CLUSTER_HOST); + props.put(CassandraBasedAuditRepository.CASSANDRA_PORT_PROPERTY, CLUSTER_PORT); + return props; + } + + private void ensureClusterCreation() throws InterruptedException { + // Retry the connection until we either connect or timeout + Cluster.Builder cassandraClusterBuilder = Cluster.builder(); + Cluster cluster = + cassandraClusterBuilder.addContactPoint(CLUSTER_HOST).withClusterName(CLUSTER_NAME_TEST).withPort(CLUSTER_PORT) + .build(); + int retryCount = 0; + + while (retryCount < MAX_RETRIES) { + try { + Session cassSession = cluster.connect(); + if (cassSession.getState().getConnectedHosts().size() > 0) { + cassSession.close(); + return; + } + } catch (Exception e) { + Thread.sleep(1000); + } + retryCount++; + } + + throw new SkipException("Unable to connect to embedded Cassandra after " + MAX_RETRIES + " seconds."); + } }
