Repository: kudu Updated Branches: refs/heads/master 6466c0d7d -> 8513685ba
http://git-wip-us.apache.org/repos/asf/kudu/blob/dc8ae799/java/kudu-client/src/test/java/org/apache/kudu/test/KuduTestHarness.java ---------------------------------------------------------------------- diff --git a/java/kudu-client/src/test/java/org/apache/kudu/test/KuduTestHarness.java b/java/kudu-client/src/test/java/org/apache/kudu/test/KuduTestHarness.java new file mode 100644 index 0000000..eeb9e9c --- /dev/null +++ b/java/kudu-client/src/test/java/org/apache/kudu/test/KuduTestHarness.java @@ -0,0 +1,445 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +package org.apache.kudu.test; + +import com.google.common.base.Stopwatch; +import com.stumbleupon.async.Deferred; +import org.apache.kudu.Common; +import org.apache.kudu.client.AsyncKuduClient; +import org.apache.kudu.client.AsyncKuduClient.AsyncKuduClientBuilder; +import org.apache.kudu.client.DeadlineTracker; +import org.apache.kudu.client.FakeDNS; +import org.apache.kudu.client.HostAndPort; +import org.apache.kudu.client.KuduClient; +import org.apache.kudu.client.KuduException; +import org.apache.kudu.client.KuduTable; +import org.apache.kudu.client.LocatedTablet; +import org.apache.kudu.client.MiniKuduCluster; +import org.apache.kudu.client.MiniKuduCluster.MiniKuduClusterBuilder; +import org.apache.kudu.client.RemoteTablet; +import org.apache.kudu.junit.RetryRule; +import org.apache.kudu.master.Master; +import org.apache.kudu.util.RandomUtils; +import org.apache.yetus.audience.InterfaceAudience; +import org.apache.yetus.audience.InterfaceStability; +import org.junit.rules.ExternalResource; +import org.junit.runner.Description; +import org.junit.runners.model.Statement; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; +import java.util.List; +import java.util.Random; +import java.util.concurrent.TimeUnit; + +import static org.junit.Assert.fail; + +/** + * A Junit Rule that manages a Kudu cluster and clients for testing. + * This rule also includes utility methods for the cluster + * and clients. + * + * <pre> + * public static class TestFoo { + * + * @Rule + * public KuduTestHarness harness = new KuduTestHarness(); + * + * ... + * } + * </pre> + */ [email protected] [email protected] +public class KuduTestHarness extends ExternalResource { + + private static final Logger LOG = LoggerFactory.getLogger(KuduTestHarness.class); + + private static final int NUM_MASTER_SERVERS = 3; + private static final int NUM_TABLET_SERVERS = 3; + + // Default timeout/sleep interval for various client operations, + // waiting for various jobs/threads to complete, etc. + public static final int DEFAULT_SLEEP = 50000; + + private final Random randomForTSRestart = RandomUtils.getRandom(); + + private MiniKuduClusterBuilder clusterBuilder; + private MiniKuduCluster miniCluster; + + // We create both versions of the asyncClient for ease of use. + private AsyncKuduClient asyncClient; + private KuduClient client; + + public KuduTestHarness(final MiniKuduClusterBuilder clusterBuilder) { + this.clusterBuilder = clusterBuilder; + } + + public KuduTestHarness() { + this.clusterBuilder = getBaseClusterBuilder(); + } + + /** + * Returns the base MiniKuduClusterBuilder used when creating a + * KuduTestHarness with the default constructor. This is useful + * if you want to add to the default cluster setup. + */ + public static MiniKuduClusterBuilder getBaseClusterBuilder() { + return new MiniKuduClusterBuilder() + .numMasterServers(NUM_MASTER_SERVERS) + .numTabletServers(NUM_TABLET_SERVERS); + } + + @Override + public Statement apply(Statement base, Description description) { + // Set any master server flags defined in the method level annotation. + MasterServerConfig masterServerConfig = description.getAnnotation(MasterServerConfig.class); + if (masterServerConfig != null) { + for (String flag : masterServerConfig.flags()) { + clusterBuilder.addMasterServerFlag(flag); + } + } + // Set any tablet server flags defined in the method level annotation. + TabletServerConfig tabletServerConfig = description.getAnnotation(TabletServerConfig.class); + if (tabletServerConfig != null) { + for (String flag : tabletServerConfig.flags()) { + clusterBuilder.addTabletServerFlag(flag); + } + } + + // Generate the ExternalResource Statement. + Statement statement = super.apply(base, description); + // Wrap in the RetryRule to rerun flaky tests. + return new RetryRule().apply(statement, description); + } + + @Override + public void before() throws Exception { + FakeDNS.getInstance().install(); + LOG.info("Creating a new MiniKuduCluster..."); + miniCluster = clusterBuilder.build(); + LOG.info("Creating a new Kudu client..."); + asyncClient = new AsyncKuduClientBuilder(miniCluster.getMasterAddressesAsString()) + .defaultAdminOperationTimeoutMs(DEFAULT_SLEEP) + .build(); + client = asyncClient.syncClient(); + } + + @Override + public void after() { + try { + if (client != null) { + client.shutdown(); + // No need to explicitly shutdown the async client, + // shutting down the sync client effectively does that. + } + } catch (KuduException e) { + LOG.warn("Error while shutting down the test client"); + } finally { + if (miniCluster != null) { + miniCluster.shutdown(); + } + } + } + + public KuduClient getClient() { + return client; + } + + public AsyncKuduClient getAsyncClient() { + return asyncClient; + } + + /** + * Helper method to easily kill a tablet server that serves the given table's only tablet's + * leader. The currently running test case will be failed if there's more than one tablet, + * if the tablet has no leader after some retries, or if the tablet server was already killed. + * + * This method is thread-safe. + * @param table a KuduTable which will get its single tablet's leader killed. + * @throws Exception + */ + public void killTabletLeader(KuduTable table) throws Exception { + List<LocatedTablet> tablets = table.getTabletsLocations(DEFAULT_SLEEP); + if (tablets.isEmpty() || tablets.size() > 1) { + fail("Currently only support killing leaders for tables containing 1 tablet, table " + + table.getName() + " has " + tablets.size()); + } + LocatedTablet tablet = tablets.get(0); + if (tablet.getReplicas().size() == 1) { + fail("Table " + table.getName() + " only has 1 tablet, please enable replication"); + } + + HostAndPort hp = findLeaderTabletServer(tablet); + miniCluster.killTabletServer(hp); + } + + /** + * Helper method to kill a tablet server that serves the given tablet's + * leader. The currently running test case will be failed if the tablet has no + * leader after some retries, or if the tablet server was already killed. + * + * This method is thread-safe. + * @param tablet a RemoteTablet which will get its leader killed + * @throws Exception + */ + public void killTabletLeader(RemoteTablet tablet) throws Exception { + killTabletLeader(new LocatedTablet(tablet)); + } + + /** + * Helper method to kill a tablet server that serves the given tablet's + * leader. The currently running test case will be failed if the tablet has no + * leader after some retries, or if the tablet server was already killed. + * + * This method is thread-safe. + * @param tablet a LocatedTablet which will get its leader killed + * @throws Exception + */ + public void killTabletLeader(LocatedTablet tablet) throws Exception { + HostAndPort hp = findLeaderTabletServer(tablet); + miniCluster.killTabletServer(hp); + } + + /** + * Finds the RPC port of the given tablet's leader tserver. + * @param tablet a LocatedTablet + * @return the host and port of the given tablet's leader tserver + * @throws Exception if we are unable to find the leader tserver + */ + public HostAndPort findLeaderTabletServer(LocatedTablet tablet) + throws Exception { + LocatedTablet.Replica leader = null; + DeadlineTracker deadlineTracker = new DeadlineTracker(); + deadlineTracker.setDeadline(DEFAULT_SLEEP); + while (leader == null) { + if (deadlineTracker.timedOut()) { + fail("Timed out while trying to find a leader for this table"); + } + + leader = tablet.getLeaderReplica(); + if (leader == null) { + LOG.info("Sleeping while waiting for a tablet LEADER to arise, currently slept {} ms", + deadlineTracker.getElapsedMillis()); + Thread.sleep(50); + } + } + return new HostAndPort(leader.getRpcHost(), leader.getRpcPort()); + } + + /** + * Helper method to easily kill the leader master. + * + * This method is thread-safe. + * @throws Exception if there is an error finding or killing the leader master. + */ + public void killLeaderMasterServer() throws Exception { + HostAndPort hp = findLeaderMasterServer(); + miniCluster.killMasterServer(hp); + } + + /** + * Find the host and port of the leader master. + * @return the host and port of the leader master + * @throws Exception if we are unable to find the leader master + */ + public HostAndPort findLeaderMasterServer() throws Exception { + Stopwatch sw = Stopwatch.createStarted(); + while (sw.elapsed(TimeUnit.MILLISECONDS) < DEFAULT_SLEEP) { + Deferred<Master.GetTableLocationsResponsePB> masterLocD = + asyncClient.getMasterTableLocationsPB(null); + Master.GetTableLocationsResponsePB r = masterLocD.join(DEFAULT_SLEEP); + Common.HostPortPB pb = r.getTabletLocations(0) + .getReplicas(0) + .getTsInfo() + .getRpcAddresses(0); + if (pb.getPort() != -1) { + return new HostAndPort(pb.getHost(), pb.getPort()); + } + } + throw new IOException(String.format("No leader master found after %d ms", DEFAULT_SLEEP)); + } + + /** + * Picks at random a tablet server that serves tablets from the passed table and restarts it. + * @param table table to query for a TS to restart + * @throws Exception + */ + public void restartTabletServer(KuduTable table) throws Exception { + List<LocatedTablet> tablets = table.getTabletsLocations(DEFAULT_SLEEP); + if (tablets.isEmpty()) { + fail("Table " + table.getName() + " doesn't have any tablets"); + } + + LocatedTablet tablet = tablets.get(0); + LocatedTablet.Replica replica = + tablet.getReplicas().get(randomForTSRestart.nextInt(tablet.getReplicas().size())); + HostAndPort hp = new HostAndPort(replica.getRpcHost(), replica.getRpcPort()); + miniCluster.killTabletServer(hp); + miniCluster.startTabletServer(hp); + } + + /** + * Kills a tablet server that serves the given tablet's leader and restarts it. + * @param tablet a RemoteTablet which will get its leader killed and restarted + * @throws Exception + */ + public void restartTabletServer(RemoteTablet tablet) throws Exception { + HostAndPort hp = findLeaderTabletServer(new LocatedTablet(tablet)); + miniCluster.killTabletServer(hp); + miniCluster.startTabletServer(hp); + } + + /** + * Kills and restarts the leader master. + * @throws Exception + */ + public void restartLeaderMaster() throws Exception { + HostAndPort hp = findLeaderMasterServer(); + miniCluster.killMasterServer(hp); + miniCluster.startMasterServer(hp); + } + + /** + * Return the comma-separated list of "host:port" pairs that describes the master + * config for this cluster. + * @return The master config string. + */ + public String getMasterAddressesAsString() { + return miniCluster.getMasterAddressesAsString(); + } + + /** + * @return the list of master servers + */ + public List<HostAndPort> getMasterServers() { + return miniCluster.getMasterServers(); + } + + /** + * @return the list of tablet servers + */ + public List<HostAndPort> getTabletServers() { + return miniCluster.getMasterServers(); + } + + /** + * @return path to the mini cluster root directory + */ + public String getClusterRoot() { + return miniCluster.getClusterRoot(); + } + + /** + * Kills all the master servers. + * Does nothing to the servers that are already dead. + * + * @throws IOException + */ + public void killAllMasterServers() throws IOException { + miniCluster.killAllMasterServers(); + } + + /** + * Starts all the master servers. + * Does nothing to the servers that are already running. + * + * @throws IOException + */ + public void startAllMasterServers() throws IOException { + miniCluster.startAllMasterServers(); + } + + /** + * Kills all the tablet servers. + * Does nothing to the servers that are already dead. + * + * @throws IOException + */ + public void killAllTabletServers() throws IOException { + miniCluster.killAllTabletServers(); + } + + /** + * Starts all the tablet servers. + * Does nothing to the servers that are already running. + * + * @throws IOException + */ + public void startAllTabletServers() throws IOException { + miniCluster.startAllTabletServers(); + } + + /** + * Removes all credentials for all principals from the Kerberos credential cache. + */ + public void kdestroy() throws IOException { + miniCluster.kdestroy(); + } + + /** + * Re-initialize Kerberos credentials for the given username, writing them + * into the Kerberos credential cache. + * @param username the username to kinit as + */ + public void kinit(String username) throws IOException { + miniCluster.kinit(username); + } + + /** + * Resets the clients so that their state is completely fresh, including meta + * cache, connections, open tables, sessions and scanners, and propagated timestamp. + */ + public void resetClients() throws IOException { + client.shutdown(); + asyncClient = new AsyncKuduClientBuilder(miniCluster.getMasterAddressesAsString()) + .defaultAdminOperationTimeoutMs(DEFAULT_SLEEP) + .build(); + client = asyncClient.syncClient(); + } + + /** + * An annotation that can be added to each test method to + * define additional master server flags to be used when + * creating the test cluster. + * + * ex: @MasterServerConfig(flags = { "key1=valA", "key2=valB" }) + */ + @Retention(RetentionPolicy.RUNTIME) + @Target({ElementType.METHOD}) + public @interface MasterServerConfig { + String[] flags(); + } + + /** + * An annotation that can be added to each test method to + * define additional tablet server flags to be used when + * creating the test cluster. + * + * ex: @TabletServerConfig(flags = { "key1=valA", "key2=valB" }) + */ + @Retention(RetentionPolicy.RUNTIME) + @Target({ElementType.METHOD}) + public @interface TabletServerConfig { + String[] flags(); + } +} http://git-wip-us.apache.org/repos/asf/kudu/blob/dc8ae799/java/kudu-flume-sink/src/test/java/org/apache/kudu/flume/sink/AvroKuduOperationsProducerTest.java ---------------------------------------------------------------------- diff --git a/java/kudu-flume-sink/src/test/java/org/apache/kudu/flume/sink/AvroKuduOperationsProducerTest.java b/java/kudu-flume-sink/src/test/java/org/apache/kudu/flume/sink/AvroKuduOperationsProducerTest.java index 9f200b8..91bc339 100644 --- a/java/kudu-flume-sink/src/test/java/org/apache/kudu/flume/sink/AvroKuduOperationsProducerTest.java +++ b/java/kudu-flume-sink/src/test/java/org/apache/kudu/flume/sink/AvroKuduOperationsProducerTest.java @@ -49,18 +49,18 @@ import org.apache.avro.specific.SpecificDatumWriter; import org.apache.flume.Context; import org.apache.flume.Event; import org.apache.flume.event.EventBuilder; - +import org.junit.Rule; import org.junit.Test; import org.apache.kudu.ColumnSchema; import org.apache.kudu.Schema; import org.apache.kudu.Type; -import org.apache.kudu.client.BaseKuduTest; import org.apache.kudu.client.CreateTableOptions; import org.apache.kudu.client.KuduTable; +import org.apache.kudu.test.KuduTestHarness; import org.apache.kudu.util.DecimalUtil; -public class AvroKuduOperationsProducerTest extends BaseKuduTest { +public class AvroKuduOperationsProducerTest { private static String schemaUriString; private static String schemaLiteral; @@ -80,6 +80,9 @@ public class AvroKuduOperationsProducerTest extends BaseKuduTest { GLOBAL, URL, LITERAL } + @Rule + public KuduTestHarness harness = new KuduTestHarness(); + @Test public void testEmptyChannel() throws Exception { testEvents(0, SchemaLocation.GLOBAL); @@ -116,7 +119,7 @@ public class AvroKuduOperationsProducerTest extends BaseKuduTest { List<Event> events = generateEvents(eventCount, schemaLocation); - KuduSinkTestUtil.processEventsCreatingSink(syncClient, context, tableName, events); + KuduSinkTestUtil.processEventsCreatingSink(harness.getClient(), context, tableName, events); List<String> answers = makeAnswers(eventCount); List<String> rows = scanTableToStrings(table); @@ -137,7 +140,7 @@ public class AvroKuduOperationsProducerTest extends BaseKuduTest { CreateTableOptions createOptions = new CreateTableOptions().setRangePartitionColumns(ImmutableList.of("key")) .setNumReplicas(1); - return createTable(tableName, new Schema(columns), createOptions); + return harness.getClient().createTable(tableName, new Schema(columns), createOptions); } private List<Event> generateEvents(int eventCount, http://git-wip-us.apache.org/repos/asf/kudu/blob/dc8ae799/java/kudu-flume-sink/src/test/java/org/apache/kudu/flume/sink/KeyedKuduOperationsProducerTest.java ---------------------------------------------------------------------- diff --git a/java/kudu-flume-sink/src/test/java/org/apache/kudu/flume/sink/KeyedKuduOperationsProducerTest.java b/java/kudu-flume-sink/src/test/java/org/apache/kudu/flume/sink/KeyedKuduOperationsProducerTest.java index 1940369..8914b06 100644 --- a/java/kudu-flume-sink/src/test/java/org/apache/kudu/flume/sink/KeyedKuduOperationsProducerTest.java +++ b/java/kudu-flume-sink/src/test/java/org/apache/kudu/flume/sink/KeyedKuduOperationsProducerTest.java @@ -16,7 +16,6 @@ * specific language governing permissions and limitations * under the License. */ - package org.apache.kudu.flume.sink; import static java.nio.charset.StandardCharsets.UTF_8; @@ -37,6 +36,7 @@ import com.google.common.collect.ImmutableMap; import org.apache.flume.Context; import org.apache.flume.Event; import org.apache.flume.event.EventBuilder; +import org.junit.Rule; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -44,13 +44,16 @@ import org.slf4j.LoggerFactory; import org.apache.kudu.ColumnSchema; import org.apache.kudu.Schema; import org.apache.kudu.Type; -import org.apache.kudu.client.BaseKuduTest; import org.apache.kudu.client.CreateTableOptions; import org.apache.kudu.client.KuduTable; +import org.apache.kudu.test.KuduTestHarness; -public class KeyedKuduOperationsProducerTest extends BaseKuduTest { +public class KeyedKuduOperationsProducerTest { private static final Logger LOG = LoggerFactory.getLogger(KeyedKuduOperationsProducerTest.class); + @Rule + public KuduTestHarness harness = new KuduTestHarness(); + private KuduTable createNewTable(String tableName) throws Exception { LOG.info("Creating new table..."); @@ -64,7 +67,8 @@ public class KeyedKuduOperationsProducerTest extends BaseKuduTest { CreateTableOptions createOptions = new CreateTableOptions().setRangePartitionColumns(ImmutableList.of(KEY_COLUMN_DEFAULT)) .setNumReplicas(1); - KuduTable table = createTable(tableName, new Schema(columns), createOptions); + KuduTable table = + harness.getClient().createTable(tableName, new Schema(columns), createOptions); LOG.info("Created new table."); @@ -111,7 +115,7 @@ public class KeyedKuduOperationsProducerTest extends BaseKuduTest { PRODUCER_PREFIX + OPERATION_PROP, "upsert", PRODUCER, SimpleKeyedKuduOperationsProducer.class.getName() )); - KuduSink sink = KuduSinkTestUtil.createSink(syncClient, tableName, ctx); + KuduSink sink = KuduSinkTestUtil.createSink(harness.getClient(), tableName, ctx); sink.start(); int numRows = 3; @@ -159,7 +163,7 @@ public class KeyedKuduOperationsProducerTest extends BaseKuduTest { List<Event> events = getEvents(eventCount); - KuduSinkTestUtil.processEventsCreatingSink(syncClient, context, tableName, events); + KuduSinkTestUtil.processEventsCreatingSink(harness.getClient(), context, tableName, events); List<String> rows = scanTableToStrings(table); assertEquals(eventCount + " row(s) expected", eventCount, rows.size()); http://git-wip-us.apache.org/repos/asf/kudu/blob/dc8ae799/java/kudu-flume-sink/src/test/java/org/apache/kudu/flume/sink/KuduSinkTest.java ---------------------------------------------------------------------- diff --git a/java/kudu-flume-sink/src/test/java/org/apache/kudu/flume/sink/KuduSinkTest.java b/java/kudu-flume-sink/src/test/java/org/apache/kudu/flume/sink/KuduSinkTest.java index eb5f7c8..3887482 100644 --- a/java/kudu-flume-sink/src/test/java/org/apache/kudu/flume/sink/KuduSinkTest.java +++ b/java/kudu-flume-sink/src/test/java/org/apache/kudu/flume/sink/KuduSinkTest.java @@ -16,7 +16,6 @@ * specific language governing permissions and limitations * under the License. */ - package org.apache.kudu.flume.sink; import static java.nio.charset.StandardCharsets.UTF_8; @@ -41,7 +40,9 @@ import org.apache.flume.Sink.Status; import org.apache.flume.Transaction; import org.apache.flume.conf.Configurables; import org.apache.flume.event.EventBuilder; +import org.apache.kudu.test.KuduTestHarness; import org.junit.Assert; +import org.junit.Rule; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -49,13 +50,15 @@ import org.slf4j.LoggerFactory; import org.apache.kudu.ColumnSchema; import org.apache.kudu.Schema; import org.apache.kudu.Type; -import org.apache.kudu.client.BaseKuduTest; import org.apache.kudu.client.CreateTableOptions; import org.apache.kudu.client.KuduTable; -public class KuduSinkTest extends BaseKuduTest { +public class KuduSinkTest { private static final Logger LOG = LoggerFactory.getLogger(KuduSinkTest.class); + @Rule + public KuduTestHarness harness = new KuduTestHarness(); + private KuduTable createNewTable(String tableName) throws Exception { LOG.info("Creating new table..."); @@ -64,7 +67,7 @@ public class KuduSinkTest extends BaseKuduTest { CreateTableOptions createOptions = new CreateTableOptions().setRangePartitionColumns(ImmutableList.of("payload")) .setNumReplicas(1); - KuduTable table = createTable(tableName, new Schema(columns), createOptions); + KuduTable table = harness.getClient().createTable(tableName, new Schema(columns), createOptions); LOG.info("Created new table."); @@ -75,7 +78,7 @@ public class KuduSinkTest extends BaseKuduTest { public void testMandatoryParameters() { LOG.info("Testing mandatory parameters..."); - KuduSink sink = new KuduSink(syncClient); + KuduSink sink = new KuduSink(harness.getClient()); HashMap<String, String> parameters = new HashMap<>(); Context context = new Context(parameters); @@ -102,7 +105,7 @@ public class KuduSinkTest extends BaseKuduTest { public void testMissingTable() { LOG.info("Testing missing table..."); - KuduSink sink = KuduSinkTestUtil.createSink(syncClient, "missingTable", new Context()); + KuduSink sink = KuduSinkTestUtil.createSink(harness.getClient(), "missingTable", new Context()); sink.start(); LOG.info("Testing missing table finished successfully."); @@ -139,7 +142,7 @@ public class KuduSinkTest extends BaseKuduTest { Context sinkContext = new Context(); sinkContext.put(KuduSinkConfigurationConstants.IGNORE_DUPLICATE_ROWS, Boolean.toString(ignoreDuplicateRows)); - KuduSink sink = KuduSinkTestUtil.createSink(syncClient, tableName, sinkContext); + KuduSink sink = KuduSinkTestUtil.createSink(harness.getClient(), tableName, sinkContext); sink.start(); Channel channel = sink.getChannel(); @@ -193,7 +196,7 @@ public class KuduSinkTest extends BaseKuduTest { events.add(e); } - KuduSinkTestUtil.processEventsCreatingSink(syncClient, new Context(), tableName, events); + KuduSinkTestUtil.processEventsCreatingSink(harness.getClient(), new Context(), tableName, events); List<String> rows = scanTableToStrings(table); assertEquals(eventCount + " row(s) expected", eventCount, rows.size()); http://git-wip-us.apache.org/repos/asf/kudu/blob/dc8ae799/java/kudu-flume-sink/src/test/java/org/apache/kudu/flume/sink/RegexpKuduOperationsProducerParseErrorTest.java ---------------------------------------------------------------------- diff --git a/java/kudu-flume-sink/src/test/java/org/apache/kudu/flume/sink/RegexpKuduOperationsProducerParseErrorTest.java b/java/kudu-flume-sink/src/test/java/org/apache/kudu/flume/sink/RegexpKuduOperationsProducerParseErrorTest.java index 9bc5942..fda478c 100644 --- a/java/kudu-flume-sink/src/test/java/org/apache/kudu/flume/sink/RegexpKuduOperationsProducerParseErrorTest.java +++ b/java/kudu-flume-sink/src/test/java/org/apache/kudu/flume/sink/RegexpKuduOperationsProducerParseErrorTest.java @@ -19,6 +19,7 @@ import com.google.common.collect.ImmutableList; import org.apache.flume.Context; import org.apache.flume.FlumeException; import org.apache.flume.event.EventBuilder; +import org.apache.kudu.test.KuduTestHarness; import org.junit.Rule; import org.junit.Test; import org.junit.rules.ExpectedException; @@ -27,12 +28,11 @@ import org.junit.rules.ExpectedException; import org.apache.kudu.ColumnSchema; import org.apache.kudu.Schema; import org.apache.kudu.Type; -import org.apache.kudu.client.BaseKuduTest; import org.apache.kudu.client.CreateTableOptions; import org.apache.kudu.client.KuduTable; import org.apache.kudu.util.CapturingLogAppender; -public class RegexpKuduOperationsProducerParseErrorTest extends BaseKuduTest { +public class RegexpKuduOperationsProducerParseErrorTest { private static final String TEST_REGEXP = "(?<key>\\d+),(?<byteFld>\\d+),(?<stringFld>\\w+)"; private static final String TEST_REGEXP_MISSING_COLUMN = "(?<key>\\d+),(?<byteFld>\\d+)"; private static final String TEST_OPERATION = "insert"; @@ -54,6 +54,9 @@ public class RegexpKuduOperationsProducerParseErrorTest extends BaseKuduTest { private static final String POLICY_IGNORE = "IGNORE"; @Rule + public KuduTestHarness harness = new KuduTestHarness(); + + @Rule public ExpectedException thrown = ExpectedException.none(); @Test @@ -265,7 +268,7 @@ public class RegexpKuduOperationsProducerParseErrorTest extends BaseKuduTest { columns.add(new ColumnSchema.ColumnSchemaBuilder("stringFld", Type.STRING).build()); CreateTableOptions createOptions = new CreateTableOptions().addHashPartitions(ImmutableList.of("key"), 3).setNumReplicas(1); - KuduTable table = createTable(tableName, new Schema(columns), createOptions); + KuduTable table = harness.getClient().createTable(tableName, new Schema(columns), createOptions); return table; } http://git-wip-us.apache.org/repos/asf/kudu/blob/dc8ae799/java/kudu-flume-sink/src/test/java/org/apache/kudu/flume/sink/RegexpKuduOperationsProducerTest.java ---------------------------------------------------------------------- diff --git a/java/kudu-flume-sink/src/test/java/org/apache/kudu/flume/sink/RegexpKuduOperationsProducerTest.java b/java/kudu-flume-sink/src/test/java/org/apache/kudu/flume/sink/RegexpKuduOperationsProducerTest.java index cadfa2e..b5c4e28 100644 --- a/java/kudu-flume-sink/src/test/java/org/apache/kudu/flume/sink/RegexpKuduOperationsProducerTest.java +++ b/java/kudu-flume-sink/src/test/java/org/apache/kudu/flume/sink/RegexpKuduOperationsProducerTest.java @@ -16,7 +16,6 @@ * specific language governing permissions and limitations * under the License. */ - package org.apache.kudu.flume.sink; import static java.nio.charset.StandardCharsets.UTF_8; @@ -35,22 +34,26 @@ import com.google.common.collect.ImmutableList; import org.apache.flume.Context; import org.apache.flume.Event; import org.apache.flume.event.EventBuilder; +import org.apache.kudu.test.KuduTestHarness; +import org.junit.Rule; import org.junit.Test; import org.apache.kudu.ColumnSchema; import org.apache.kudu.Schema; import org.apache.kudu.Type; -import org.apache.kudu.client.BaseKuduTest; import org.apache.kudu.client.CreateTableOptions; import org.apache.kudu.client.KuduTable; import org.apache.kudu.util.DecimalUtil; -public class RegexpKuduOperationsProducerTest extends BaseKuduTest { +public class RegexpKuduOperationsProducerTest { private static final String TEST_REGEXP = "(?<key>\\d+),(?<byteFld>\\d+),(?<shortFld>\\d+),(?<intFld>\\d+)," + "(?<longFld>\\d+),(?<binaryFld>\\w+),(?<stringFld>\\w+),(?<boolFld>\\w+)," + "(?<floatFld>\\d+\\.\\d*),(?<doubleFld>\\d+.\\d*),(?<decimalFld>\\d+.\\d*)"; + @Rule + public KuduTestHarness harness = new KuduTestHarness(); + private KuduTable createNewTable(String tableName) throws Exception { ArrayList<ColumnSchema> columns = new ArrayList<>(10); columns.add(new ColumnSchema.ColumnSchemaBuilder("key", Type.INT32).key(true).build()); @@ -67,7 +70,7 @@ public class RegexpKuduOperationsProducerTest extends BaseKuduTest { .typeAttributes(DecimalUtil.typeAttributes(9, 1)).build()); CreateTableOptions createOptions = new CreateTableOptions().addHashPartitions(ImmutableList.of("key"), 3).setNumReplicas(1); - return createTable(tableName, new Schema(columns), createOptions); + return harness.getClient().createTable(tableName, new Schema(columns), createOptions); } @Test @@ -116,7 +119,7 @@ public class RegexpKuduOperationsProducerTest extends BaseKuduTest { List<Event> events = generateEvents(eventCount, perEventRowCount, operation); - KuduSinkTestUtil.processEventsCreatingSink(syncClient, context, tableName, events); + KuduSinkTestUtil.processEventsCreatingSink(harness.getClient(), context, tableName, events); List<String> rows = scanTableToStrings(table); assertEquals(eventCount * perEventRowCount + " row(s) expected", http://git-wip-us.apache.org/repos/asf/kudu/blob/dc8ae799/java/kudu-flume-sink/src/test/java/org/apache/kudu/flume/sink/SecureKuduSinkTest.java ---------------------------------------------------------------------- diff --git a/java/kudu-flume-sink/src/test/java/org/apache/kudu/flume/sink/SecureKuduSinkTest.java b/java/kudu-flume-sink/src/test/java/org/apache/kudu/flume/sink/SecureKuduSinkTest.java index 7fbfcef..e84e1f9 100644 --- a/java/kudu-flume-sink/src/test/java/org/apache/kudu/flume/sink/SecureKuduSinkTest.java +++ b/java/kudu-flume-sink/src/test/java/org/apache/kudu/flume/sink/SecureKuduSinkTest.java @@ -33,7 +33,9 @@ import com.google.common.collect.ImmutableList; import org.apache.flume.Event; import org.apache.flume.EventDeliveryException; import org.apache.flume.event.EventBuilder; +import org.apache.kudu.test.KuduTestHarness; import org.junit.Before; +import org.junit.Rule; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -41,30 +43,29 @@ import org.slf4j.LoggerFactory; import org.apache.kudu.ColumnSchema; import org.apache.kudu.Schema; import org.apache.kudu.Type; -import org.apache.kudu.client.BaseKuduTest; import org.apache.kudu.client.CreateTableOptions; import org.apache.kudu.client.KuduTable; import org.apache.kudu.client.MiniKuduCluster.MiniKuduClusterBuilder; -public class SecureKuduSinkTest extends BaseKuduTest { +public class SecureKuduSinkTest { private static final Logger LOG = LoggerFactory.getLogger(SecureKuduSinkTest.class); private static final int TICKET_LIFETIME_SECONDS = 10; private static final int RENEWABLE_LIFETIME_SECONDS = 30; + private static final MiniKuduClusterBuilder clusterBuilder = KuduTestHarness.getBaseClusterBuilder() + .kdcTicketLifetime(TICKET_LIFETIME_SECONDS + "s") + .kdcRenewLifetime(RENEWABLE_LIFETIME_SECONDS + "s") + .enableKerberos(); + + @Rule + public KuduTestHarness harness = new KuduTestHarness(clusterBuilder); + @Before public void clearTicketCacheProperty() { // Let Flume authenticate. System.clearProperty(KUDU_TICKETCACHE_PROPERTY); } - @Override - protected MiniKuduClusterBuilder getMiniClusterBuilder() { - return super.getMiniClusterBuilder() - .kdcTicketLifetime(TICKET_LIFETIME_SECONDS + "s") - .kdcRenewLifetime(RENEWABLE_LIFETIME_SECONDS + "s") - .enableKerberos(); - } - @Test public void testEventsWithShortTickets() throws Exception { LOG.info("Creating new table..."); @@ -74,11 +75,11 @@ public class SecureKuduSinkTest extends BaseKuduTest { new CreateTableOptions().setRangePartitionColumns(ImmutableList.of("payload")) .setNumReplicas(1); String tableName = "test_long_lived_events"; - KuduTable table = createTable(tableName, new Schema(columns), createOptions); + KuduTable table = harness.getClient().createTable(tableName, new Schema(columns), createOptions); LOG.info("Created new table."); KuduSink sink = KuduSinkTestUtil.createSecureSink( - tableName, getMasterAddressesAsString(), getClusterRoot()); + tableName, harness.getMasterAddressesAsString(), harness.getClusterRoot()); sink.start(); LOG.info("Testing events at the beginning."); http://git-wip-us.apache.org/repos/asf/kudu/blob/dc8ae799/java/kudu-mapreduce/src/test/java/org/apache/kudu/mapreduce/ITInputFormatJob.java ---------------------------------------------------------------------- diff --git a/java/kudu-mapreduce/src/test/java/org/apache/kudu/mapreduce/ITInputFormatJob.java b/java/kudu-mapreduce/src/test/java/org/apache/kudu/mapreduce/ITInputFormatJob.java index cd0e95a..ef175b5 100644 --- a/java/kudu-mapreduce/src/test/java/org/apache/kudu/mapreduce/ITInputFormatJob.java +++ b/java/kudu-mapreduce/src/test/java/org/apache/kudu/mapreduce/ITInputFormatJob.java @@ -16,6 +16,7 @@ // under the License. package org.apache.kudu.mapreduce; +import static org.apache.kudu.test.KuduTestHarness.DEFAULT_SLEEP; import static org.apache.kudu.util.ClientTestUtil.createFourTabletsTableWithNineRows; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; @@ -30,16 +31,19 @@ import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat; +import org.apache.kudu.Schema; +import org.apache.kudu.test.KuduTestHarness; +import org.apache.kudu.util.ClientTestUtil; import org.junit.After; +import org.junit.Rule; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.kudu.client.BaseKuduTest; import org.apache.kudu.client.KuduPredicate; import org.apache.kudu.client.RowResult; -public class ITInputFormatJob extends BaseKuduTest { +public class ITInputFormatJob { private static final Logger LOG = LoggerFactory.getLogger(ITInputFormatJob.class); private static final String TABLE_NAME = @@ -47,9 +51,14 @@ public class ITInputFormatJob extends BaseKuduTest { private static final HadoopTestingUtility HADOOP_UTIL = new HadoopTestingUtility(); + private static final Schema basicSchema = ClientTestUtil.getBasicSchema(); + /** Counter enumeration to count the actual rows. */ private enum Counters { ROWS } + @Rule + public KuduTestHarness harness = new KuduTestHarness(); + @After public void tearDown() throws Exception { HADOOP_UTIL.cleanup(); @@ -59,7 +68,7 @@ public class ITInputFormatJob extends BaseKuduTest { @SuppressWarnings("deprecation") public void test() throws Exception { - createFourTabletsTableWithNineRows(client, TABLE_NAME, DEFAULT_SLEEP); + createFourTabletsTableWithNineRows(harness.getAsyncClient(), TABLE_NAME, DEFAULT_SLEEP); JobConf conf = new JobConf(); HADOOP_UTIL.setupAndGetTestDir(ITInputFormatJob.class.getName(), conf).getAbsolutePath(); @@ -92,7 +101,7 @@ public class ITInputFormatJob extends BaseKuduTest { job, TABLE_NAME, "*", - getMasterAddressesAsString()) + harness.getMasterAddressesAsString()) .operationTimeoutMs(DEFAULT_SLEEP) .addDependencies(false) .cacheBlocks(false) http://git-wip-us.apache.org/repos/asf/kudu/blob/dc8ae799/java/kudu-mapreduce/src/test/java/org/apache/kudu/mapreduce/ITKuduTableInputFormat.java ---------------------------------------------------------------------- diff --git a/java/kudu-mapreduce/src/test/java/org/apache/kudu/mapreduce/ITKuduTableInputFormat.java b/java/kudu-mapreduce/src/test/java/org/apache/kudu/mapreduce/ITKuduTableInputFormat.java index 3266535..14a0c55 100644 --- a/java/kudu-mapreduce/src/test/java/org/apache/kudu/mapreduce/ITKuduTableInputFormat.java +++ b/java/kudu-mapreduce/src/test/java/org/apache/kudu/mapreduce/ITKuduTableInputFormat.java @@ -16,6 +16,7 @@ // under the License. package org.apache.kudu.mapreduce; +import static org.apache.kudu.test.KuduTestHarness.DEFAULT_SLEEP; import static org.apache.kudu.util.ClientTestUtil.getBasicCreateTableOptions; import static org.apache.kudu.util.ClientTestUtil.getBasicSchema; import static org.junit.Assert.assertEquals; @@ -32,27 +33,31 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.kudu.test.KuduTestHarness; +import org.junit.Rule; import org.junit.Test; import org.apache.kudu.Schema; import org.apache.kudu.client.AsyncKuduSession; -import org.apache.kudu.client.BaseKuduTest; import org.apache.kudu.client.Insert; import org.apache.kudu.client.KuduPredicate; import org.apache.kudu.client.KuduTable; import org.apache.kudu.client.PartialRow; import org.apache.kudu.client.RowResult; -public class ITKuduTableInputFormat extends BaseKuduTest { +public class ITKuduTableInputFormat { private static final String TABLE_NAME = ITKuduTableInputFormat.class.getName() + "-" + System.currentTimeMillis(); + @Rule + public KuduTestHarness harness = new KuduTestHarness(); + @Test public void test() throws Exception { - createTable(TABLE_NAME, getBasicSchema(), getBasicCreateTableOptions()); + harness.getClient().createTable(TABLE_NAME, getBasicSchema(), getBasicCreateTableOptions()); - KuduTable table = openTable(TABLE_NAME); + KuduTable table = harness.getClient().openTable(TABLE_NAME); Schema schema = getBasicSchema(); Insert insert = table.newInsert(); PartialRow row = insert.getRow(); @@ -61,7 +66,7 @@ public class ITKuduTableInputFormat extends BaseKuduTest { row.addInt(2, 3); row.addString(3, "a string"); row.addBoolean(4, true); - AsyncKuduSession session = client.newSession(); + AsyncKuduSession session = harness.getAsyncClient().newSession(); session.apply(insert).join(DEFAULT_SLEEP); session.close().join(DEFAULT_SLEEP); @@ -122,7 +127,7 @@ public class ITKuduTableInputFormat extends BaseKuduTest { List<KuduPredicate> predicates) throws IOException, InterruptedException { KuduTableInputFormat input = new KuduTableInputFormat(); Configuration conf = new Configuration(); - conf.set(KuduTableInputFormat.MASTER_ADDRESSES_KEY, getMasterAddressesAsString()); + conf.set(KuduTableInputFormat.MASTER_ADDRESSES_KEY, harness.getMasterAddressesAsString()); conf.set(KuduTableInputFormat.INPUT_TABLE_KEY, TABLE_NAME); if (columnProjection != null) { conf.set(KuduTableInputFormat.COLUMN_PROJECTION_KEY, columnProjection); http://git-wip-us.apache.org/repos/asf/kudu/blob/dc8ae799/java/kudu-mapreduce/src/test/java/org/apache/kudu/mapreduce/ITKuduTableOutputFormat.java ---------------------------------------------------------------------- diff --git a/java/kudu-mapreduce/src/test/java/org/apache/kudu/mapreduce/ITKuduTableOutputFormat.java b/java/kudu-mapreduce/src/test/java/org/apache/kudu/mapreduce/ITKuduTableOutputFormat.java index 597461f..f8a2b27 100644 --- a/java/kudu-mapreduce/src/test/java/org/apache/kudu/mapreduce/ITKuduTableOutputFormat.java +++ b/java/kudu-mapreduce/src/test/java/org/apache/kudu/mapreduce/ITKuduTableOutputFormat.java @@ -25,27 +25,31 @@ import static org.junit.Assert.assertNotNull; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.RecordWriter; +import org.apache.kudu.test.KuduTestHarness; +import org.junit.Rule; import org.junit.Test; import org.apache.kudu.client.AsyncKuduScanner; -import org.apache.kudu.client.BaseKuduTest; import org.apache.kudu.client.Insert; import org.apache.kudu.client.KuduTable; import org.apache.kudu.client.Operation; import org.apache.kudu.client.PartialRow; -public class ITKuduTableOutputFormat extends BaseKuduTest { +public class ITKuduTableOutputFormat { private static final String TABLE_NAME = ITKuduTableOutputFormat.class.getName() + "-" + System.currentTimeMillis(); + @Rule + public KuduTestHarness harness = new KuduTestHarness(); + @Test public void test() throws Exception { - createTable(TABLE_NAME, getBasicSchema(), getBasicCreateTableOptions()); + harness.getClient().createTable(TABLE_NAME, getBasicSchema(), getBasicCreateTableOptions()); KuduTableOutputFormat output = new KuduTableOutputFormat(); Configuration conf = new Configuration(); - conf.set(KuduTableOutputFormat.MASTER_ADDRESSES_KEY, getMasterAddressesAsString()); + conf.set(KuduTableOutputFormat.MASTER_ADDRESSES_KEY, harness.getMasterAddressesAsString()); conf.set(KuduTableOutputFormat.OUTPUT_TABLE_KEY, TABLE_NAME); output.setConf(conf); @@ -64,7 +68,7 @@ public class ITKuduTableOutputFormat extends BaseKuduTest { RecordWriter<NullWritable, Operation> rw = output.getRecordWriter(null); rw.write(NullWritable.get(), insert); rw.close(null); - AsyncKuduScanner.AsyncKuduScannerBuilder builder = client.newScannerBuilder(table); + AsyncKuduScanner.AsyncKuduScannerBuilder builder = harness.getAsyncClient().newScannerBuilder(table); assertEquals(1, countRowsInScan(builder.build())); } } http://git-wip-us.apache.org/repos/asf/kudu/blob/dc8ae799/java/kudu-mapreduce/src/test/java/org/apache/kudu/mapreduce/ITOutputFormatJob.java ---------------------------------------------------------------------- diff --git a/java/kudu-mapreduce/src/test/java/org/apache/kudu/mapreduce/ITOutputFormatJob.java b/java/kudu-mapreduce/src/test/java/org/apache/kudu/mapreduce/ITOutputFormatJob.java index 9c9918d..080f16a 100644 --- a/java/kudu-mapreduce/src/test/java/org/apache/kudu/mapreduce/ITOutputFormatJob.java +++ b/java/kudu-mapreduce/src/test/java/org/apache/kudu/mapreduce/ITOutputFormatJob.java @@ -17,6 +17,7 @@ package org.apache.kudu.mapreduce; import static java.nio.charset.StandardCharsets.UTF_8; +import static org.apache.kudu.test.KuduTestHarness.DEFAULT_SLEEP; import static org.apache.kudu.util.ClientTestUtil.countRowsInScan; import static org.apache.kudu.util.ClientTestUtil.getBasicCreateTableOptions; import static org.apache.kudu.util.ClientTestUtil.getBasicSchema; @@ -35,28 +36,31 @@ import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; +import org.apache.kudu.test.KuduTestHarness; import org.junit.After; import org.junit.Before; +import org.junit.Rule; import org.junit.Test; import org.apache.kudu.client.AsyncKuduScanner; -import org.apache.kudu.client.BaseKuduTest; import org.apache.kudu.client.Insert; import org.apache.kudu.client.KuduTable; import org.apache.kudu.client.Operation; import org.apache.kudu.client.PartialRow; - -public class ITOutputFormatJob extends BaseKuduTest { +public class ITOutputFormatJob { private static final String TABLE_NAME = ITOutputFormatJob.class.getName() + "-" + System.currentTimeMillis(); private static final HadoopTestingUtility HADOOP_UTIL = new HadoopTestingUtility(); + @Rule + public KuduTestHarness harness = new KuduTestHarness(); + @Before public void setUp() throws Exception { - createTable(TABLE_NAME, getBasicSchema(), getBasicCreateTableOptions()); + harness.getClient().createTable(TABLE_NAME, getBasicSchema(), getBasicCreateTableOptions()); } @After @@ -88,7 +92,7 @@ public class ITOutputFormatJob extends BaseKuduTest { new KuduTableMapReduceUtil.TableOutputFormatConfigurator( job, TABLE_NAME, - getMasterAddressesAsString()) + harness.getMasterAddressesAsString()) .operationTimeoutMs(DEFAULT_SLEEP) .addDependencies(false) .configure(); @@ -96,9 +100,9 @@ public class ITOutputFormatJob extends BaseKuduTest { assertTrue("Test job did not end properly", job.waitForCompletion(true)); // Make sure the data's there - KuduTable table = openTable(TABLE_NAME); + KuduTable table = harness.getClient().openTable(TABLE_NAME); AsyncKuduScanner.AsyncKuduScannerBuilder builder = - client.newScannerBuilder(table); + harness.getAsyncClient().newScannerBuilder(table); assertEquals(2, countRowsInScan(builder.build())); } http://git-wip-us.apache.org/repos/asf/kudu/blob/dc8ae799/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/KuduTestSuite.scala ---------------------------------------------------------------------- diff --git a/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/KuduTestSuite.scala b/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/KuduTestSuite.scala index 5b5fc1b..af6423b 100644 --- a/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/KuduTestSuite.scala +++ b/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/KuduTestSuite.scala @@ -129,8 +129,8 @@ trait KuduTestSuite extends JUnitSuite { @Before def setUpBase(): Unit = { miniCluster = new MiniKuduClusterBuilder() - .numMasters(1) - .numTservers(1) + .numMasterServers(1) + .numTabletServers(1) .build() ss = SparkSession.builder().config(conf).getOrCreate()
