Repository: kudu Updated Branches: refs/heads/master ad620415a -> 7db4d82f4
KUDU-2012 Kudu Flume sink auth support Adding FlumeAuthenticator to KuduSink and creating KuduClient inside a PrivilegedExecutor action. Added an extra step to the mini cluster to create a keyTab for the client used for testing. Added automated test with short KDC ticket lifetime to test reacquiring. Manual testing was done on a secure cluster as well. Change-Id: I11b5f08802883afa178d346af48d3bcd15281917 Reviewed-on: http://gerrit.cloudera.org:8080/11334 Tested-by: Kudu Jenkins Reviewed-by: Mike Percy <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/kudu/repo Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/7db4d82f Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/7db4d82f Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/7db4d82f Branch: refs/heads/master Commit: 7db4d82f4bcb98aba05853249fd4c84677c57227 Parents: ad62041 Author: Ferenc Szabó <[email protected]> Authored: Mon Aug 27 07:16:52 2018 +0200 Committer: Mike Percy <[email protected]> Committed: Fri Sep 28 18:43:34 2018 +0000 ---------------------------------------------------------------------- .../org/apache/kudu/client/BaseKuduTest.java | 7 ++ .../org/apache/kudu/client/MiniKuduCluster.java | 7 ++ .../org/apache/kudu/flume/sink/KuduSink.java | 25 +++- .../sink/KuduSinkConfigurationConstants.java | 15 +++ .../sink/AvroKuduOperationsProducerTest.java | 63 +++------- .../sink/KeyedKuduOperationsProducerTest.java | 98 ++++------------ .../apache/kudu/flume/sink/KuduSinkTest.java | 64 ++--------- .../kudu/flume/sink/KuduSinkTestUtil.java | 85 ++++++++++++++ .../sink/RegexpKuduOperationsProducerTest.java | 113 +++++++----------- .../kudu/flume/sink/SecureKuduSinkTest.java | 115 +++++++++++++++++++ src/kudu/mini-cluster/external_mini_cluster.cc | 3 + src/kudu/security/test/mini_kdc.cc | 13 +++ src/kudu/security/test/mini_kdc.h | 4 + 13 files changed, 365 insertions(+), 247 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kudu/blob/7db4d82f/java/kudu-client/src/test/java/org/apache/kudu/client/BaseKuduTest.java ---------------------------------------------------------------------- diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/BaseKuduTest.java b/java/kudu-client/src/test/java/org/apache/kudu/client/BaseKuduTest.java index 959b92c..c410cf7 100644 --- a/java/kudu-client/src/test/java/org/apache/kudu/client/BaseKuduTest.java +++ b/java/kudu-client/src/test/java/org/apache/kudu/client/BaseKuduTest.java @@ -300,6 +300,13 @@ public class BaseKuduTest { } /** + * @return path to the mini cluster root directory + */ + protected String getClusterRoot() { + return miniCluster.getClusterRoot(); + } + + /** * Kills all the master servers. * Does nothing to the servers that are already dead. * http://git-wip-us.apache.org/repos/asf/kudu/blob/7db4d82f/java/kudu-client/src/test/java/org/apache/kudu/client/MiniKuduCluster.java ---------------------------------------------------------------------- diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/MiniKuduCluster.java b/java/kudu-client/src/test/java/org/apache/kudu/client/MiniKuduCluster.java index a372dfc..dfa5ea7 100644 --- a/java/kudu-client/src/test/java/org/apache/kudu/client/MiniKuduCluster.java +++ b/java/kudu-client/src/test/java/org/apache/kudu/client/MiniKuduCluster.java @@ -506,6 +506,13 @@ public class MiniKuduCluster implements AutoCloseable { } /** + * @return path to the mini cluster root directory + */ + public String getClusterRoot() { + return clusterRoot; + } + + /** * Helper runnable that receives stderr and logs it along with the process' identifier. */ public static class ProcessInputStreamLogPrinterRunnable implements Runnable { http://git-wip-us.apache.org/repos/asf/kudu/blob/7db4d82f/java/kudu-flume-sink/src/main/java/org/apache/kudu/flume/sink/KuduSink.java ---------------------------------------------------------------------- diff --git a/java/kudu-flume-sink/src/main/java/org/apache/kudu/flume/sink/KuduSink.java b/java/kudu-flume-sink/src/main/java/org/apache/kudu/flume/sink/KuduSink.java index 105bc1e..f63f941 100644 --- a/java/kudu-flume-sink/src/main/java/org/apache/kudu/flume/sink/KuduSink.java +++ b/java/kudu-flume-sink/src/main/java/org/apache/kudu/flume/sink/KuduSink.java @@ -21,11 +21,15 @@ package org.apache.kudu.flume.sink; import static org.apache.kudu.flume.sink.KuduSinkConfigurationConstants.BATCH_SIZE; import static org.apache.kudu.flume.sink.KuduSinkConfigurationConstants.IGNORE_DUPLICATE_ROWS; +import static org.apache.kudu.flume.sink.KuduSinkConfigurationConstants.KERBEROS_KEYTAB; +import static org.apache.kudu.flume.sink.KuduSinkConfigurationConstants.KERBEROS_PRINCIPAL; import static org.apache.kudu.flume.sink.KuduSinkConfigurationConstants.MASTER_ADDRESSES; import static org.apache.kudu.flume.sink.KuduSinkConfigurationConstants.PRODUCER; +import static org.apache.kudu.flume.sink.KuduSinkConfigurationConstants.PROXY_USER; import static org.apache.kudu.flume.sink.KuduSinkConfigurationConstants.TABLE_NAME; import static org.apache.kudu.flume.sink.KuduSinkConfigurationConstants.TIMEOUT_MILLIS; +import java.security.PrivilegedAction; import java.util.List; import com.google.common.base.Preconditions; @@ -35,6 +39,8 @@ import org.apache.flume.Event; import org.apache.flume.EventDeliveryException; import org.apache.flume.FlumeException; import org.apache.flume.Transaction; +import org.apache.flume.auth.FlumeAuthenticationUtil; +import org.apache.flume.auth.PrivilegedExecutor; import org.apache.flume.conf.Configurable; import org.apache.flume.instrumentation.SinkCounter; import org.apache.flume.sink.AbstractSink; @@ -109,6 +115,7 @@ public class KuduSink extends AbstractSink implements Configurable { private KuduClient client; private KuduOperationsProducer operationsProducer; private SinkCounter sinkCounter; + private PrivilegedExecutor privilegedExecutor; public KuduSink() { this(null); @@ -127,7 +134,15 @@ public class KuduSink extends AbstractSink implements Configurable { // Client is not null only inside tests. if (client == null) { - client = new KuduClient.KuduClientBuilder(masterAddresses).build(); + // Creating client with FlumeAuthenticator. + client = privilegedExecutor.execute( + new PrivilegedAction<KuduClient>() { + @Override + public KuduClient run() { + return new KuduClient.KuduClientBuilder(masterAddresses).build(); + } + } + ); } session = client.newSession(); session.setFlushMode(SessionConfiguration.FlushMode.MANUAL_FLUSH); @@ -194,6 +209,12 @@ public class KuduSink extends AbstractSink implements Configurable { timeoutMillis = context.getLong(TIMEOUT_MILLIS, DEFAULT_TIMEOUT_MILLIS); ignoreDuplicateRows = context.getBoolean(IGNORE_DUPLICATE_ROWS, DEFAULT_IGNORE_DUPLICATE_ROWS); String operationProducerType = context.getString(PRODUCER); + String kerberosPrincipal = context.getString(KERBEROS_PRINCIPAL); + String kerberosKeytab = context.getString(KERBEROS_KEYTAB); + String proxyUser = context.getString(PROXY_USER); + + privilegedExecutor = FlumeAuthenticationUtil.getAuthenticator( + kerberosPrincipal, kerberosKeytab).proxyAs(proxyUser); // Check for operations producer, if null set default operations producer type. if (operationProducerType == null || operationProducerType.isEmpty()) { @@ -203,7 +224,7 @@ public class KuduSink extends AbstractSink implements Configurable { Context producerContext = new Context(); producerContext.putAll(context.getSubProperties( - KuduSinkConfigurationConstants.PRODUCER_PREFIX)); + KuduSinkConfigurationConstants.PRODUCER_PREFIX)); try { Class<? extends KuduOperationsProducer> clazz = http://git-wip-us.apache.org/repos/asf/kudu/blob/7db4d82f/java/kudu-flume-sink/src/main/java/org/apache/kudu/flume/sink/KuduSinkConfigurationConstants.java ---------------------------------------------------------------------- diff --git a/java/kudu-flume-sink/src/main/java/org/apache/kudu/flume/sink/KuduSinkConfigurationConstants.java b/java/kudu-flume-sink/src/main/java/org/apache/kudu/flume/sink/KuduSinkConfigurationConstants.java index d3b4fb6..dbb2f66 100644 --- a/java/kudu-flume-sink/src/main/java/org/apache/kudu/flume/sink/KuduSinkConfigurationConstants.java +++ b/java/kudu-flume-sink/src/main/java/org/apache/kudu/flume/sink/KuduSinkConfigurationConstants.java @@ -64,4 +64,19 @@ public class KuduSinkConfigurationConstants { * Whether to ignore duplicate primary key errors caused by inserts. */ public static final String IGNORE_DUPLICATE_ROWS = "ignoreDuplicateRows"; + + /** + * Path to the keytab file used for authentication + */ + public static final String KERBEROS_KEYTAB = "kerberosKeytab"; + + /** + * Kerberos principal used for authentication + */ + public static final String KERBEROS_PRINCIPAL = "kerberosPrincipal"; + + /** + * The effective user if different from the kerberos principal + */ + public static final String PROXY_USER = "proxyUser"; } http://git-wip-us.apache.org/repos/asf/kudu/blob/7db4d82f/java/kudu-flume-sink/src/test/java/org/apache/kudu/flume/sink/AvroKuduOperationsProducerTest.java ---------------------------------------------------------------------- diff --git a/java/kudu-flume-sink/src/test/java/org/apache/kudu/flume/sink/AvroKuduOperationsProducerTest.java b/java/kudu-flume-sink/src/test/java/org/apache/kudu/flume/sink/AvroKuduOperationsProducerTest.java index 7b20a2c..9f200b8 100644 --- a/java/kudu-flume-sink/src/test/java/org/apache/kudu/flume/sink/AvroKuduOperationsProducerTest.java +++ b/java/kudu-flume-sink/src/test/java/org/apache/kudu/flume/sink/AvroKuduOperationsProducerTest.java @@ -23,10 +23,8 @@ import static java.nio.charset.StandardCharsets.UTF_8; import static org.apache.kudu.flume.sink.AvroKuduOperationsProducer.SCHEMA_LITERAL_HEADER; import static org.apache.kudu.flume.sink.AvroKuduOperationsProducer.SCHEMA_PROP; import static org.apache.kudu.flume.sink.AvroKuduOperationsProducer.SCHEMA_URL_HEADER; -import static org.apache.kudu.flume.sink.KuduSinkConfigurationConstants.MASTER_ADDRESSES; import static org.apache.kudu.flume.sink.KuduSinkConfigurationConstants.PRODUCER; import static org.apache.kudu.flume.sink.KuduSinkConfigurationConstants.PRODUCER_PREFIX; -import static org.apache.kudu.flume.sink.KuduSinkConfigurationConstants.TABLE_NAME; import static org.apache.kudu.util.ClientTestUtil.scanTableToStrings; import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; @@ -38,7 +36,6 @@ import java.net.URL; import java.nio.file.Paths; import java.util.ArrayList; import java.util.Collections; -import java.util.HashMap; import java.util.List; import com.google.common.collect.ImmutableList; @@ -49,15 +46,10 @@ import org.apache.avro.io.DatumWriter; import org.apache.avro.io.Encoder; import org.apache.avro.io.EncoderFactory; import org.apache.avro.specific.SpecificDatumWriter; -import org.apache.flume.Channel; import org.apache.flume.Context; import org.apache.flume.Event; -import org.apache.flume.Sink; -import org.apache.flume.Transaction; -import org.apache.flume.channel.MemoryChannel; -import org.apache.flume.conf.Configurables; import org.apache.flume.event.EventBuilder; -import org.apache.kudu.util.DecimalUtil; + import org.junit.Test; import org.apache.kudu.ColumnSchema; @@ -66,6 +58,7 @@ import org.apache.kudu.Type; import org.apache.kudu.client.BaseKuduTest; import org.apache.kudu.client.CreateTableOptions; import org.apache.kudu.client.KuduTable; +import org.apache.kudu.util.DecimalUtil; public class AvroKuduOperationsProducerTest extends BaseKuduTest { private static String schemaUriString; @@ -117,27 +110,13 @@ public class AvroKuduOperationsProducerTest extends BaseKuduTest { KuduTable table = createNewTable( String.format("test%sevents%s", eventCount, schemaLocation)); String tableName = table.getName(); - Context ctx = schemaLocation != SchemaLocation.GLOBAL ? new Context() + Context context = schemaLocation != SchemaLocation.GLOBAL ? new Context() : new Context(ImmutableMap.of(PRODUCER_PREFIX + SCHEMA_PROP, schemaUriString)); - KuduSink sink = createSink(tableName, ctx); - - Channel channel = new MemoryChannel(); - Configurables.configure(channel, new Context()); - sink.setChannel(channel); - sink.start(); - - Transaction tx = channel.getTransaction(); - tx.begin(); - writeEventsToChannel(channel, eventCount, schemaLocation); - tx.commit(); - tx.close(); - - Sink.Status status = sink.process(); - if (eventCount == 0) { - assertEquals("incorrect status for empty channel", status, Sink.Status.BACKOFF); - } else { - assertEquals("incorrect status for non-empty channel", status, Sink.Status.READY); - } + context.put(PRODUCER, AvroKuduOperationsProducer.class.getName()); + + List<Event> events = generateEvents(eventCount, schemaLocation); + + KuduSinkTestUtil.processEventsCreatingSink(syncClient, context, tableName, events); List<String> answers = makeAnswers(eventCount); List<String> rows = scanTableToStrings(table); @@ -146,11 +125,12 @@ public class AvroKuduOperationsProducerTest extends BaseKuduTest { } private KuduTable createNewTable(String tableName) throws Exception { - ArrayList<ColumnSchema> columns = new ArrayList<>(5); + List<ColumnSchema> columns = new ArrayList<>(5); columns.add(new ColumnSchema.ColumnSchemaBuilder("key", Type.INT32).key(true).build()); columns.add(new ColumnSchema.ColumnSchemaBuilder("longField", Type.INT64).build()); columns.add(new ColumnSchema.ColumnSchemaBuilder("doubleField", Type.DOUBLE).build()); - columns.add(new ColumnSchema.ColumnSchemaBuilder("nullableField", Type.STRING).nullable(true).build()); + columns.add(new ColumnSchema.ColumnSchemaBuilder("nullableField", Type.STRING) + .nullable(true).build()); columns.add(new ColumnSchema.ColumnSchemaBuilder("stringField", Type.STRING).build()); columns.add(new ColumnSchema.ColumnSchemaBuilder("decimalField", Type.DECIMAL) .typeAttributes(DecimalUtil.typeAttributes(9, 1)).build()); @@ -160,21 +140,9 @@ public class AvroKuduOperationsProducerTest extends BaseKuduTest { return createTable(tableName, new Schema(columns), createOptions); } - private KuduSink createSink(String tableName, Context ctx) { - KuduSink sink = new KuduSink(syncClient); - HashMap<String, String> parameters = new HashMap<>(); - parameters.put(TABLE_NAME, tableName); - parameters.put(MASTER_ADDRESSES, getMasterAddressesAsString()); - parameters.put(PRODUCER, AvroKuduOperationsProducer.class.getName()); - Context context = new Context(parameters); - context.putAll(ctx.getParameters()); - Configurables.configure(sink, context); - - return sink; - } - - private void writeEventsToChannel(Channel channel, int eventCount, - SchemaLocation schemaLocation) throws Exception { + private List<Event> generateEvents(int eventCount, + SchemaLocation schemaLocation) throws Exception { + List<Event> events = new ArrayList<>(); for (int i = 0; i < eventCount; i++) { AvroKuduOperationsProducerTestRecord record = new AvroKuduOperationsProducerTestRecord(); record.setKey(10 * i); @@ -195,8 +163,9 @@ public class AvroKuduOperationsProducerTest extends BaseKuduTest { } else if (schemaLocation == SchemaLocation.LITERAL) { e.setHeaders(ImmutableMap.of(SCHEMA_LITERAL_HEADER, schemaLiteral)); } - channel.put(e); + events.add(e); } + return events; } private List<String> makeAnswers(int eventCount) { http://git-wip-us.apache.org/repos/asf/kudu/blob/7db4d82f/java/kudu-flume-sink/src/test/java/org/apache/kudu/flume/sink/KeyedKuduOperationsProducerTest.java ---------------------------------------------------------------------- diff --git a/java/kudu-flume-sink/src/test/java/org/apache/kudu/flume/sink/KeyedKuduOperationsProducerTest.java b/java/kudu-flume-sink/src/test/java/org/apache/kudu/flume/sink/KeyedKuduOperationsProducerTest.java index b16a209..1940369 100644 --- a/java/kudu-flume-sink/src/test/java/org/apache/kudu/flume/sink/KeyedKuduOperationsProducerTest.java +++ b/java/kudu-flume-sink/src/test/java/org/apache/kudu/flume/sink/KeyedKuduOperationsProducerTest.java @@ -16,13 +16,12 @@ * specific language governing permissions and limitations * under the License. */ + package org.apache.kudu.flume.sink; import static java.nio.charset.StandardCharsets.UTF_8; -import static org.apache.kudu.flume.sink.KuduSinkConfigurationConstants.MASTER_ADDRESSES; import static org.apache.kudu.flume.sink.KuduSinkConfigurationConstants.PRODUCER; import static org.apache.kudu.flume.sink.KuduSinkConfigurationConstants.PRODUCER_PREFIX; -import static org.apache.kudu.flume.sink.KuduSinkConfigurationConstants.TABLE_NAME; import static org.apache.kudu.flume.sink.SimpleKeyedKuduOperationsProducer.KEY_COLUMN_DEFAULT; import static org.apache.kudu.flume.sink.SimpleKeyedKuduOperationsProducer.OPERATION_PROP; import static org.apache.kudu.flume.sink.SimpleKeyedKuduOperationsProducer.PAYLOAD_COLUMN_DEFAULT; @@ -31,18 +30,12 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import java.util.ArrayList; -import java.util.HashMap; import java.util.List; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; -import org.apache.flume.Channel; import org.apache.flume.Context; import org.apache.flume.Event; -import org.apache.flume.Sink; -import org.apache.flume.Transaction; -import org.apache.flume.channel.MemoryChannel; -import org.apache.flume.conf.Configurables; import org.apache.flume.event.EventBuilder; import org.junit.Test; import org.slf4j.Logger; @@ -69,7 +62,7 @@ public class KeyedKuduOperationsProducerTest extends BaseKuduTest { new ColumnSchema.ColumnSchemaBuilder(PAYLOAD_COLUMN_DEFAULT, Type.BINARY) .key(false).build()); CreateTableOptions createOptions = - new CreateTableOptions().setRangePartitionColumns(ImmutableList.of(KEY_COLUMN_DEFAULT)) + new CreateTableOptions().setRangePartitionColumns(ImmutableList.of(KEY_COLUMN_DEFAULT)) .setNumReplicas(1); KuduTable table = createTable(tableName, new Schema(columns), createOptions); @@ -114,29 +107,22 @@ public class KeyedKuduOperationsProducerTest extends BaseKuduTest { KuduTable table = createNewTable("testDupUpsertEvents"); String tableName = table.getName(); - Context ctx = new Context(ImmutableMap.of(PRODUCER_PREFIX + OPERATION_PROP, "upsert")); - KuduSink sink = createSink(tableName, ctx); - - Channel channel = new MemoryChannel(); - Configurables.configure(channel, new Context()); - sink.setChannel(channel); + Context ctx = new Context(ImmutableMap.of( + PRODUCER_PREFIX + OPERATION_PROP, "upsert", + PRODUCER, SimpleKeyedKuduOperationsProducer.class.getName() + )); + KuduSink sink = KuduSinkTestUtil.createSink(syncClient, tableName, ctx); sink.start(); - Transaction tx = channel.getTransaction(); - tx.begin(); - int numRows = 3; + List<Event> events = new ArrayList<>(); for (int i = 0; i < numRows; i++) { Event e = EventBuilder.withBody(String.format("payload body %s", i), UTF_8); e.setHeaders(ImmutableMap.of(KEY_COLUMN_DEFAULT, String.format("key %s", i))); - channel.put(e); + events.add(e); } - tx.commit(); - tx.close(); - - Sink.Status status = sink.process(); - assertTrue("incorrect status for non-empty channel", status != Sink.Status.BACKOFF); + KuduSinkTestUtil.processEvents(sink, events); List<String> rows = scanTableToStrings(table); assertEquals(numRows + " row(s) expected", numRows, rows.size()); @@ -145,18 +131,10 @@ public class KeyedKuduOperationsProducerTest extends BaseKuduTest { assertTrue("incorrect payload", rows.get(i).contains("payload body " + i)); } - Transaction utx = channel.getTransaction(); - utx.begin(); - Event dup = EventBuilder.withBody("payload body upserted".getBytes(UTF_8)); dup.setHeaders(ImmutableMap.of("key", String.format("key %s", 0))); - channel.put(dup); - utx.commit(); - utx.close(); - - Sink.Status upStatus = sink.process(); - assertTrue("incorrect status for non-empty channel", upStatus != Sink.Status.BACKOFF); + KuduSinkTestUtil.processEvents(sink, ImmutableList.of(dup)); List<String> upRows = scanTableToStrings(table); assertEquals(numRows + " row(s) expected", numRows, upRows.size()); @@ -174,33 +152,14 @@ public class KeyedKuduOperationsProducerTest extends BaseKuduTest { KuduTable table = createNewTable("test" + eventCount + "events" + operation); String tableName = table.getName(); - Context ctx = new Context(ImmutableMap.of(PRODUCER_PREFIX + OPERATION_PROP, operation)); - KuduSink sink = createSink(tableName, ctx); - - Channel channel = new MemoryChannel(); - Configurables.configure(channel, new Context()); - sink.setChannel(channel); - sink.start(); - - Transaction tx = channel.getTransaction(); - tx.begin(); - - for (int i = 0; i < eventCount; i++) { - Event e = EventBuilder.withBody(String.format("payload body %s", i) - .getBytes(UTF_8)); - e.setHeaders(ImmutableMap.of("key", String.format("key %s", i))); - channel.put(e); - } + Context context = new Context(ImmutableMap.of( + PRODUCER_PREFIX + OPERATION_PROP, operation, + PRODUCER, SimpleKeyedKuduOperationsProducer.class.getName() + )); - tx.commit(); - tx.close(); + List<Event> events = getEvents(eventCount); - Sink.Status status = sink.process(); - if (eventCount == 0) { - assertTrue("incorrect status for empty channel", status == Sink.Status.BACKOFF); - } else { - assertTrue("incorrect status for non-empty channel", status != Sink.Status.BACKOFF); - } + KuduSinkTestUtil.processEventsCreatingSink(syncClient, context, tableName, events); List<String> rows = scanTableToStrings(table); assertEquals(eventCount + " row(s) expected", eventCount, rows.size()); @@ -212,20 +171,13 @@ public class KeyedKuduOperationsProducerTest extends BaseKuduTest { LOG.info("Testing {} events finished successfully.", eventCount); } - private KuduSink createSink(String tableName, Context ctx) { - LOG.info("Creating Kudu sink for '{}' table...", tableName); - - KuduSink sink = new KuduSink(syncClient); - HashMap<String, String> parameters = new HashMap<>(); - parameters.put(TABLE_NAME, tableName); - parameters.put(MASTER_ADDRESSES, getMasterAddressesAsString()); - parameters.put(PRODUCER, SimpleKeyedKuduOperationsProducer.class.getName()); - Context context = new Context(parameters); - context.putAll(ctx.getParameters()); - Configurables.configure(sink, context); - - LOG.info("Created Kudu sink for '{}' table.", tableName); - - return sink; + private List<Event> getEvents(int eventCount) { + List<Event> events = new ArrayList<>(); + for (int i = 0; i < eventCount; i++) { + Event e = EventBuilder.withBody(String.format("payload body %s", i).getBytes(UTF_8)); + e.setHeaders(ImmutableMap.of("key", String.format("key %s", i))); + events.add(e); + } + return events; } } http://git-wip-us.apache.org/repos/asf/kudu/blob/7db4d82f/java/kudu-flume-sink/src/test/java/org/apache/kudu/flume/sink/KuduSinkTest.java ---------------------------------------------------------------------- diff --git a/java/kudu-flume-sink/src/test/java/org/apache/kudu/flume/sink/KuduSinkTest.java b/java/kudu-flume-sink/src/test/java/org/apache/kudu/flume/sink/KuduSinkTest.java index d4dce94..eb5f7c8 100644 --- a/java/kudu-flume-sink/src/test/java/org/apache/kudu/flume/sink/KuduSinkTest.java +++ b/java/kudu-flume-sink/src/test/java/org/apache/kudu/flume/sink/KuduSinkTest.java @@ -16,11 +16,13 @@ * specific language governing permissions and limitations * under the License. */ + package org.apache.kudu.flume.sink; import static java.nio.charset.StandardCharsets.UTF_8; import static org.apache.kudu.util.ClientTestUtil.scanTableToStrings; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertSame; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -35,8 +37,8 @@ import org.apache.flume.Event; import org.apache.flume.EventDeliveryException; import org.apache.flume.FlumeException; import org.apache.flume.Sink; +import org.apache.flume.Sink.Status; import org.apache.flume.Transaction; -import org.apache.flume.channel.MemoryChannel; import org.apache.flume.conf.Configurables; import org.apache.flume.event.EventBuilder; import org.junit.Assert; @@ -97,13 +99,10 @@ public class KuduSinkTest extends BaseKuduTest { } @Test(expected = FlumeException.class) - public void testMissingTable() throws Exception { + public void testMissingTable() { LOG.info("Testing missing table..."); - KuduSink sink = createSink("missingTable"); - Channel channel = new MemoryChannel(); - Configurables.configure(channel, new Context()); - sink.setChannel(channel); + KuduSink sink = KuduSinkTestUtil.createSink(syncClient, "missingTable", new Context()); sink.start(); LOG.info("Testing missing table finished successfully."); @@ -140,12 +139,9 @@ public class KuduSinkTest extends BaseKuduTest { Context sinkContext = new Context(); sinkContext.put(KuduSinkConfigurationConstants.IGNORE_DUPLICATE_ROWS, Boolean.toString(ignoreDuplicateRows)); - KuduSink sink = createSink(tableName, sinkContext); - - Channel channel = new MemoryChannel(); - Configurables.configure(channel, new Context()); - sink.setChannel(channel); + KuduSink sink = KuduSinkTestUtil.createSink(syncClient, tableName, sinkContext); sink.start(); + Channel channel = sink.getChannel(); Transaction tx = channel.getTransaction(); tx.begin(); @@ -163,7 +159,7 @@ public class KuduSinkTest extends BaseKuduTest { if (!ignoreDuplicateRows) { fail("Incorrectly ignored duplicate rows!"); } - assertTrue("incorrect status for empty channel", status == Sink.Status.READY); + assertSame("incorrect status for empty channel", status, Status.READY); } catch (EventDeliveryException e) { if (ignoreDuplicateRows) { throw new AssertionError("Failed to ignore duplicate rows!", e); @@ -189,31 +185,15 @@ public class KuduSinkTest extends BaseKuduTest { KuduTable table = createNewTable("test" + eventCount + "events"); String tableName = table.getName(); - KuduSink sink = createSink(tableName); - - Channel channel = new MemoryChannel(); - Configurables.configure(channel, new Context()); - sink.setChannel(channel); - sink.start(); - Transaction tx = channel.getTransaction(); - tx.begin(); + List<Event> events = new ArrayList<>(); for (int i = 0; i < eventCount; i++) { - Event e = EventBuilder.withBody(String.format("payload body %s", i) - .getBytes(UTF_8)); - channel.put(e); + Event e = EventBuilder.withBody(String.format("payload body %s", i).getBytes(UTF_8)); + events.add(e); } - tx.commit(); - tx.close(); - - Sink.Status status = sink.process(); - if (eventCount == 0) { - assertTrue("incorrect status for empty channel", status == Sink.Status.BACKOFF); - } else { - assertTrue("incorrect status for non-empty channel", status != Sink.Status.BACKOFF); - } + KuduSinkTestUtil.processEventsCreatingSink(syncClient, new Context(), tableName, events); List<String> rows = scanTableToStrings(table); assertEquals(eventCount + " row(s) expected", eventCount, rows.size()); @@ -224,24 +204,4 @@ public class KuduSinkTest extends BaseKuduTest { LOG.info("Testing {} events finished successfully.", eventCount); } - - private KuduSink createSink(String tableName) { - return createSink(tableName, new Context()); - } - - private KuduSink createSink(String tableName, Context ctx) { - LOG.info("Creating Kudu sink for '{}' table...", tableName); - - KuduSink sink = new KuduSink(syncClient); - HashMap<String, String> parameters = new HashMap<>(); - parameters.put(KuduSinkConfigurationConstants.TABLE_NAME, tableName); - parameters.put(KuduSinkConfigurationConstants.MASTER_ADDRESSES, getMasterAddressesAsString()); - Context context = new Context(parameters); - context.putAll(ctx.getParameters()); - Configurables.configure(sink, context); - - LOG.info("Created Kudu sink for '{}' table.", tableName); - - return sink; - } } http://git-wip-us.apache.org/repos/asf/kudu/blob/7db4d82f/java/kudu-flume-sink/src/test/java/org/apache/kudu/flume/sink/KuduSinkTestUtil.java ---------------------------------------------------------------------- diff --git a/java/kudu-flume-sink/src/test/java/org/apache/kudu/flume/sink/KuduSinkTestUtil.java b/java/kudu-flume-sink/src/test/java/org/apache/kudu/flume/sink/KuduSinkTestUtil.java new file mode 100644 index 0000000..e24bfe0 --- /dev/null +++ b/java/kudu-flume-sink/src/test/java/org/apache/kudu/flume/sink/KuduSinkTestUtil.java @@ -0,0 +1,85 @@ +package org.apache.kudu.flume.sink; + +import static org.apache.kudu.flume.sink.KuduSinkConfigurationConstants.KERBEROS_KEYTAB; +import static org.apache.kudu.flume.sink.KuduSinkConfigurationConstants.KERBEROS_PRINCIPAL; +import static org.apache.kudu.flume.sink.KuduSinkConfigurationConstants.MASTER_ADDRESSES; +import static org.apache.kudu.flume.sink.KuduSinkConfigurationConstants.TABLE_NAME; +import static org.junit.Assert.assertNotSame; +import static org.junit.Assert.assertSame; + +import java.util.List; + +import org.apache.flume.Channel; +import org.apache.flume.Context; +import org.apache.flume.Event; +import org.apache.flume.EventDeliveryException; +import org.apache.flume.Sink.Status; +import org.apache.flume.Transaction; +import org.apache.flume.channel.MemoryChannel; +import org.apache.flume.conf.Configurables; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.kudu.client.KuduClient; + +class KuduSinkTestUtil { + private static final Logger LOG = LoggerFactory.getLogger(KuduSinkTestUtil.class); + + static KuduSink createSink(KuduClient client, String tableName, Context ctx) { + return createSink(tableName, client, ctx, client.getMasterAddressesAsString()); + } + + private static KuduSink createSink( + String tableName, KuduClient client, Context ctx, String masterAddresses) { + LOG.info("Creating Kudu sink for '{}' table...", tableName); + + Context context = new Context(); + context.put(TABLE_NAME, tableName); + context.put(MASTER_ADDRESSES, masterAddresses); + context.putAll(ctx.getParameters()); + KuduSink sink = new KuduSink(client); + Configurables.configure(sink, context); + Channel channel = new MemoryChannel(); + Configurables.configure(channel, new Context()); + sink.setChannel(channel); + + LOG.info("Created Kudu sink for '{}' table.", tableName); + + return sink; + } + + static KuduSink createSecureSink(String tableName, String masterAddresses, String clusterRoot) { + Context context = new Context(); + context.put(KERBEROS_KEYTAB, clusterRoot + "/krb5kdc/test-user.keytab"); + context.put(KERBEROS_PRINCIPAL, "[email protected]"); + + return createSink(tableName, null, context, masterAddresses); + } + + static void processEventsCreatingSink( + KuduClient syncClient, Context context, String tableName, List<Event> events + ) throws EventDeliveryException { + KuduSink sink = createSink(syncClient, tableName, context); + sink.start(); + processEvents(sink, events); + } + + static void processEvents(KuduSink sink, List<Event> events) throws EventDeliveryException { + Channel channel = sink.getChannel(); + Transaction tx = channel.getTransaction(); + tx.begin(); + for (Event e : events) { + channel.put(e); + } + tx.commit(); + tx.close(); + + Status status = sink.process(); + if (events.isEmpty()) { + assertSame("incorrect status for empty channel", status, Status.BACKOFF); + } else { + assertNotSame("incorrect status for non-empty channel", status, Status.BACKOFF); + } + } +} http://git-wip-us.apache.org/repos/asf/kudu/blob/7db4d82f/java/kudu-flume-sink/src/test/java/org/apache/kudu/flume/sink/RegexpKuduOperationsProducerTest.java ---------------------------------------------------------------------- diff --git a/java/kudu-flume-sink/src/test/java/org/apache/kudu/flume/sink/RegexpKuduOperationsProducerTest.java b/java/kudu-flume-sink/src/test/java/org/apache/kudu/flume/sink/RegexpKuduOperationsProducerTest.java index 8b4c3df..cadfa2e 100644 --- a/java/kudu-flume-sink/src/test/java/org/apache/kudu/flume/sink/RegexpKuduOperationsProducerTest.java +++ b/java/kudu-flume-sink/src/test/java/org/apache/kudu/flume/sink/RegexpKuduOperationsProducerTest.java @@ -16,34 +16,25 @@ * specific language governing permissions and limitations * under the License. */ + package org.apache.kudu.flume.sink; import static java.nio.charset.StandardCharsets.UTF_8; -import static org.apache.kudu.flume.sink.KuduSinkConfigurationConstants.MASTER_ADDRESSES; import static org.apache.kudu.flume.sink.KuduSinkConfigurationConstants.PRODUCER; import static org.apache.kudu.flume.sink.KuduSinkConfigurationConstants.PRODUCER_PREFIX; -import static org.apache.kudu.flume.sink.KuduSinkConfigurationConstants.TABLE_NAME; import static org.apache.kudu.flume.sink.RegexpKuduOperationsProducer.OPERATION_PROP; import static org.apache.kudu.flume.sink.RegexpKuduOperationsProducer.PATTERN_PROP; import static org.apache.kudu.util.ClientTestUtil.scanTableToStrings; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; import java.util.ArrayList; import java.util.Collections; -import java.util.HashMap; import java.util.List; import com.google.common.collect.ImmutableList; -import org.apache.flume.Channel; import org.apache.flume.Context; import org.apache.flume.Event; -import org.apache.flume.Sink; -import org.apache.flume.Transaction; -import org.apache.flume.channel.MemoryChannel; -import org.apache.flume.conf.Configurables; import org.apache.flume.event.EventBuilder; -import org.apache.kudu.util.DecimalUtil; import org.junit.Test; import org.apache.kudu.ColumnSchema; @@ -52,6 +43,7 @@ import org.apache.kudu.Type; import org.apache.kudu.client.BaseKuduTest; import org.apache.kudu.client.CreateTableOptions; import org.apache.kudu.client.KuduTable; +import org.apache.kudu.util.DecimalUtil; public class RegexpKuduOperationsProducerTest extends BaseKuduTest { private static final String TEST_REGEXP = @@ -75,8 +67,7 @@ public class RegexpKuduOperationsProducerTest extends BaseKuduTest { .typeAttributes(DecimalUtil.typeAttributes(9, 1)).build()); CreateTableOptions createOptions = new CreateTableOptions().addHashPartitions(ImmutableList.of("key"), 3).setNumReplicas(1); - KuduTable table = createTable(tableName, new Schema(columns), createOptions); - return table; + return createTable(tableName, new Schema(columns), createOptions); } @Test @@ -117,16 +108,42 @@ public class RegexpKuduOperationsProducerTest extends BaseKuduTest { private void testEvents(int eventCount, int perEventRowCount, String operation) throws Exception { String tableName = String.format("test%sevents%srowseach%s", eventCount, perEventRowCount, operation); + Context context = new Context(); + context.put(PRODUCER_PREFIX + PATTERN_PROP, TEST_REGEXP); + context.put(PRODUCER_PREFIX + OPERATION_PROP, operation); + context.put(PRODUCER, RegexpKuduOperationsProducer.class.getName()); KuduTable table = createNewTable(tableName); - KuduSink sink = createSink(tableName, operation); - Channel channel = new MemoryChannel(); - Configurables.configure(channel, new Context()); - sink.setChannel(channel); - sink.start(); + List<Event> events = generateEvents(eventCount, perEventRowCount, operation); - Transaction tx = channel.getTransaction(); - tx.begin(); + KuduSinkTestUtil.processEventsCreatingSink(syncClient, context, tableName, events); + + List<String> rows = scanTableToStrings(table); + assertEquals(eventCount * perEventRowCount + " row(s) expected", + eventCount * perEventRowCount, + rows.size()); + + ArrayList<String> rightAnswers = new ArrayList<>(eventCount * perEventRowCount); + for (int i = 0; i < eventCount; i++) { + for (int j = 0; j < perEventRowCount; j++) { + int value = operation.equals("upsert") && i == 0 ? 1 : i; + String baseAnswer = "INT32 key=1%2$d%3$d1, INT8 byteFld=%1$d, INT16 shortFld=%1$d, " + + "INT32 intFld=%1$d, INT64 longFld=%1$d, BINARY binaryFld=\"binary\", " + + "STRING stringFld=string, BOOL boolFld=false, FLOAT floatFld=%1$d.%1$d, " + + "DOUBLE doubleFld=%1$d.%1$d, DECIMAL decimalFld(9, 1)=%1$d.%1$d"; + String rightAnswer = String.format(baseAnswer, value, i, j); + rightAnswers.add(rightAnswer); + } + } + Collections.sort(rightAnswers); + + for (int k = 0; k < eventCount * perEventRowCount; k++) { + assertEquals("incorrect row", rightAnswers.get(k), rows.get(k)); + } + } + + private List<Event> generateEvents(int eventCount, int perEventRowCount, String operation) { + List<Event> events = new ArrayList<>(); for (int i = 0; i < eventCount; i++) { StringBuilder payload = new StringBuilder(); @@ -137,7 +154,7 @@ public class RegexpKuduOperationsProducerTest extends BaseKuduTest { payload.append(row); } Event e = EventBuilder.withBody(payload.toString().getBytes(UTF_8)); - channel.put(e); + events.add(e); } if (eventCount > 0) { @@ -151,7 +168,7 @@ public class RegexpKuduOperationsProducerTest extends BaseKuduTest { upserts.append(row); } Event e = EventBuilder.withBody(upserts.toString().getBytes(UTF_8)); - channel.put(e); + events.add(e); } // Also check some bad/corner cases. @@ -160,59 +177,9 @@ public class RegexpKuduOperationsProducerTest extends BaseKuduTest { String[] testCases = {mismatchInInt, emptyString}; for (String testCase : testCases) { Event e = EventBuilder.withBody(testCase.getBytes(UTF_8)); - channel.put(e); - } - } - - tx.commit(); - tx.close(); - - Sink.Status status = sink.process(); - if (eventCount == 0) { - assertTrue("incorrect status for empty channel", status == Sink.Status.BACKOFF); - } else { - assertTrue("incorrect status for non-empty channel", status != Sink.Status.BACKOFF); - } - - List<String> rows = scanTableToStrings(table); - assertEquals(eventCount * perEventRowCount + " row(s) expected", - eventCount * perEventRowCount, - rows.size()); - - ArrayList<String> rightAnswers = new ArrayList<>(eventCount * perEventRowCount); - for (int i = 0; i < eventCount; i++) { - for (int j = 0; j < perEventRowCount; j++) { - int value = operation.equals("upsert") && i == 0 ? 1 : i; - String baseAnswer = "INT32 key=1%2$d%3$d1, INT8 byteFld=%1$d, INT16 shortFld=%1$d, " + - "INT32 intFld=%1$d, INT64 longFld=%1$d, BINARY binaryFld=\"binary\", " + - "STRING stringFld=string, BOOL boolFld=false, FLOAT floatFld=%1$d.%1$d, " + - "DOUBLE doubleFld=%1$d.%1$d, DECIMAL decimalFld(9, 1)=%1$d.%1$d"; - String rightAnswer = String.format(baseAnswer, value, i, j); - rightAnswers.add(rightAnswer); + events.add(e); } } - Collections.sort(rightAnswers); - - for (int k = 0; k < eventCount * perEventRowCount; k++) { - assertEquals("incorrect row", rightAnswers.get(k), rows.get(k)); - } - } - - private KuduSink createSink(String tableName, String operation) { - return createSink(tableName, new Context(), operation); - } - - private KuduSink createSink(String tableName, Context ctx, String operation) { - KuduSink sink = new KuduSink(syncClient); - HashMap<String, String> parameters = new HashMap<>(); - parameters.put(TABLE_NAME, tableName); - parameters.put(MASTER_ADDRESSES, getMasterAddressesAsString()); - parameters.put(PRODUCER, RegexpKuduOperationsProducer.class.getName()); - parameters.put(PRODUCER_PREFIX + PATTERN_PROP, TEST_REGEXP); - parameters.put(PRODUCER_PREFIX + OPERATION_PROP, operation); - Context context = new Context(parameters); - context.putAll(ctx.getParameters()); - Configurables.configure(sink, context); - return sink; + return events; } } http://git-wip-us.apache.org/repos/asf/kudu/blob/7db4d82f/java/kudu-flume-sink/src/test/java/org/apache/kudu/flume/sink/SecureKuduSinkTest.java ---------------------------------------------------------------------- diff --git a/java/kudu-flume-sink/src/test/java/org/apache/kudu/flume/sink/SecureKuduSinkTest.java b/java/kudu-flume-sink/src/test/java/org/apache/kudu/flume/sink/SecureKuduSinkTest.java new file mode 100644 index 0000000..7fbfcef --- /dev/null +++ b/java/kudu-flume-sink/src/test/java/org/apache/kudu/flume/sink/SecureKuduSinkTest.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.kudu.flume.sink; + +import static java.nio.charset.StandardCharsets.UTF_8; +import static org.apache.kudu.util.ClientTestUtil.scanTableToStrings; +import static org.apache.kudu.util.SecurityUtil.KUDU_TICKETCACHE_PROPERTY; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.TimeUnit; + +import com.google.common.collect.ImmutableList; +import org.apache.flume.Event; +import org.apache.flume.EventDeliveryException; +import org.apache.flume.event.EventBuilder; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.kudu.ColumnSchema; +import org.apache.kudu.Schema; +import org.apache.kudu.Type; +import org.apache.kudu.client.BaseKuduTest; +import org.apache.kudu.client.CreateTableOptions; +import org.apache.kudu.client.KuduTable; +import org.apache.kudu.client.MiniKuduCluster.MiniKuduClusterBuilder; + +public class SecureKuduSinkTest extends BaseKuduTest { + private static final Logger LOG = LoggerFactory.getLogger(SecureKuduSinkTest.class); + private static final int TICKET_LIFETIME_SECONDS = 10; + private static final int RENEWABLE_LIFETIME_SECONDS = 30; + + @Before + public void clearTicketCacheProperty() { + // Let Flume authenticate. + System.clearProperty(KUDU_TICKETCACHE_PROPERTY); + } + + @Override + protected MiniKuduClusterBuilder getMiniClusterBuilder() { + return super.getMiniClusterBuilder() + .kdcTicketLifetime(TICKET_LIFETIME_SECONDS + "s") + .kdcRenewLifetime(RENEWABLE_LIFETIME_SECONDS + "s") + .enableKerberos(); + } + + @Test + public void testEventsWithShortTickets() throws Exception { + LOG.info("Creating new table..."); + ArrayList<ColumnSchema> columns = new ArrayList<>(1); + columns.add(new ColumnSchema.ColumnSchemaBuilder("payload", Type.BINARY).key(true).build()); + CreateTableOptions createOptions = + new CreateTableOptions().setRangePartitionColumns(ImmutableList.of("payload")) + .setNumReplicas(1); + String tableName = "test_long_lived_events"; + KuduTable table = createTable(tableName, new Schema(columns), createOptions); + LOG.info("Created new table."); + + KuduSink sink = KuduSinkTestUtil.createSecureSink( + tableName, getMasterAddressesAsString(), getClusterRoot()); + sink.start(); + + LOG.info("Testing events at the beginning."); + int eventCount = 10; + + processEvents(sink, 0, eventCount / 2); + + LOG.info("Waiting for tickets to expire"); + TimeUnit.SECONDS.sleep(RENEWABLE_LIFETIME_SECONDS * 2); + + LOG.info("Testing events after ticket renewal."); + processEvents(sink, eventCount / 2, eventCount); + + List<String> rows = scanTableToStrings(table); + assertEquals(eventCount + " row(s) expected", eventCount, rows.size()); + + for (int i = 0; i < eventCount; i++) { + assertTrue("incorrect payload", rows.get(i).contains("payload body " + i)); + } + + LOG.info("Testing {} events finished successfully.", eventCount); + } + + private void processEvents(KuduSink sink, int from, int to) throws EventDeliveryException { + List<Event> events = new ArrayList<>(); + for (int i = from; i < to; i++) { + Event e = EventBuilder.withBody(String.format("payload body %s", i).getBytes(UTF_8)); + events.add(e); + } + + KuduSinkTestUtil.processEvents(sink, events); + LOG.info("Events flushed."); + } +} http://git-wip-us.apache.org/repos/asf/kudu/blob/7db4d82f/src/kudu/mini-cluster/external_mini_cluster.cc ---------------------------------------------------------------------- diff --git a/src/kudu/mini-cluster/external_mini_cluster.cc b/src/kudu/mini-cluster/external_mini_cluster.cc index 76472fe..414956d 100644 --- a/src/kudu/mini-cluster/external_mini_cluster.cc +++ b/src/kudu/mini-cluster/external_mini_cluster.cc @@ -183,6 +183,9 @@ Status ExternalMiniCluster::Start() { RETURN_NOT_OK_PREPEND(kdc_->CreateUserPrincipal("joe-interloper"), "could not create unauthorized principal"); + RETURN_NOT_OK_PREPEND(kdc_->CreateKeytabForExistingPrincipal("test-user"), + "could not create client keytab"); + RETURN_NOT_OK_PREPEND(kdc_->Kinit("test-admin"), "could not kinit as admin"); http://git-wip-us.apache.org/repos/asf/kudu/blob/7db4d82f/src/kudu/security/test/mini_kdc.cc ---------------------------------------------------------------------- diff --git a/src/kudu/security/test/mini_kdc.cc b/src/kudu/security/test/mini_kdc.cc index 904695a..f4151e6 100644 --- a/src/kudu/security/test/mini_kdc.cc +++ b/src/kudu/security/test/mini_kdc.cc @@ -272,6 +272,19 @@ Status MiniKdc::CreateServiceKeytab(const string& spn, return Status::OK(); } +Status MiniKdc::CreateKeytabForExistingPrincipal(const string& spn) { + SCOPED_LOG_SLOW_EXECUTION(WARNING, 100, Substitute("creating keytab for $0", spn)); + string kt_path = spn; + StripString(&kt_path, "/", '_'); + kt_path = JoinPathSegments(options_.data_root, kt_path) + ".keytab"; + + string kadmin; + RETURN_NOT_OK(GetBinaryPath("kadmin.local", &kadmin)); + RETURN_NOT_OK(Subprocess::Call(MakeArgv({ + kadmin, "-q", Substitute("xst -norandkey -k $0 $1", kt_path, spn)}))); + return Status::OK(); +} + Status MiniKdc::Kinit(const string& username) { SCOPED_LOG_SLOW_EXECUTION(WARNING, 100, Substitute("kinit for $0", username)); string kinit; http://git-wip-us.apache.org/repos/asf/kudu/blob/7db4d82f/src/kudu/security/test/mini_kdc.h ---------------------------------------------------------------------- diff --git a/src/kudu/security/test/mini_kdc.h b/src/kudu/security/test/mini_kdc.h index e282cc4..95e7848 100644 --- a/src/kudu/security/test/mini_kdc.h +++ b/src/kudu/security/test/mini_kdc.h @@ -92,6 +92,10 @@ class MiniKdc { // will be reset and a new keytab will be generated. Status CreateServiceKeytab(const std::string& spn, std::string* path); + // Creates a keytab for an existing principal. + // 'spn' is the desired service principal name (e.g. "kudu/foo.example.com"). + Status CreateKeytabForExistingPrincipal(const std::string& spn); + // Kinit a user to the mini KDC. Status Kinit(const std::string& username) WARN_UNUSED_RESULT;
