Repository: flume Updated Branches: refs/heads/flume-1.6 13a03003e -> 914106c0f
http://git-wip-us.apache.org/repos/asf/flume/blob/914106c0/flume-ng-sinks/flume-dataset-sink/src/test/java/org/apache/flume/sink/kite/TestDatasetSink.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-dataset-sink/src/test/java/org/apache/flume/sink/kite/TestDatasetSink.java b/flume-ng-sinks/flume-dataset-sink/src/test/java/org/apache/flume/sink/kite/TestDatasetSink.java index c46d66c..58aa467 100644 --- a/flume-ng-sinks/flume-dataset-sink/src/test/java/org/apache/flume/sink/kite/TestDatasetSink.java +++ b/flume-ng-sinks/flume-dataset-sink/src/test/java/org/apache/flume/sink/kite/TestDatasetSink.java @@ -18,6 +18,8 @@ package org.apache.flume.sink.kite; +import org.apache.flume.sink.kite.parser.EntityParser; +import org.apache.flume.sink.kite.policy.FailurePolicy; import com.google.common.base.Function; import com.google.common.base.Throwables; import com.google.common.collect.Iterables; @@ -29,6 +31,7 @@ import java.io.File; import java.io.FileWriter; import java.io.IOException; import java.net.URI; +import java.nio.ByteBuffer; import java.util.Arrays; import java.util.HashSet; import java.util.List; @@ -37,12 +40,14 @@ import java.util.Set; import java.util.concurrent.Callable; import javax.annotation.Nullable; import org.apache.avro.Schema; +import org.apache.avro.file.DataFileWriter; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericRecord; import org.apache.avro.generic.GenericRecordBuilder; import org.apache.avro.io.Encoder; import org.apache.avro.io.EncoderFactory; import org.apache.avro.reflect.ReflectDatumWriter; +import org.apache.avro.util.Utf8; import org.apache.commons.io.FileUtils; import org.apache.flume.Channel; import org.apache.flume.Context; @@ -52,6 +57,7 @@ import org.apache.flume.Transaction; import org.apache.flume.channel.MemoryChannel; import org.apache.flume.conf.Configurables; import org.apache.flume.event.SimpleEvent; +import org.apache.flume.source.avro.AvroFlumeEvent; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.hdfs.MiniDFSCluster; @@ -64,9 +70,11 @@ import org.junit.Test; import org.kitesdk.data.Dataset; import org.kitesdk.data.DatasetDescriptor; import org.kitesdk.data.DatasetReader; +import org.kitesdk.data.DatasetWriter; import org.kitesdk.data.Datasets; import org.kitesdk.data.PartitionStrategy; import org.kitesdk.data.View; +import static org.mockito.Mockito.*; public class TestDatasetSink { @@ -74,6 +82,8 @@ public class TestDatasetSink { public static final String DATASET_NAME = "test"; public static final String FILE_DATASET_URI = "dataset:file:target/test-repo/" + DATASET_NAME; + public static final String ERROR_DATASET_URI = + "dataset:file:target/test-repo/failed-events"; public static final File SCHEMA_FILE = new File("target/record-schema.avsc"); public static final Schema RECORD_SCHEMA = new Schema.Parser().parse( "{\"type\":\"record\",\"name\":\"rec\",\"fields\":[" + @@ -127,6 +137,7 @@ public class TestDatasetSink { Datasets.create(FILE_DATASET_URI, DESCRIPTOR); this.config = new Context(); + config.put("keep-alive", "0"); this.in = new MemoryChannel(); Configurables.configure(in, config); @@ -195,7 +206,7 @@ public class TestDatasetSink { } @Test - public void testFileStore() throws EventDeliveryException { + public void testFileStore() throws EventDeliveryException, NonRecoverableEventException, NonRecoverableEventException { DatasetSink sink = sink(in, config); // run the sink @@ -222,6 +233,19 @@ public class TestDatasetSink { // run the sink sink.start(); sink.process(); + + // the transaction should not commit during the call to process + assertThrows("Transaction should still be open", IllegalStateException.class, + new Callable() { + @Override + public Object call() throws EventDeliveryException { + in.getTransaction().begin(); + return null; + } + }); + // The records won't commit until the call to stop() + Assert.assertEquals("Should not have committed", 0, read(created).size()); + sink.stop(); Assert.assertEquals(Sets.newHashSet(expected), read(created)); @@ -509,6 +533,376 @@ public class TestDatasetSink { expected.size() + 1, remaining(in)); } + @Test + public void testFileStoreWithSavePolicy() throws EventDeliveryException { + if (Datasets.exists(ERROR_DATASET_URI)) { + Datasets.delete(ERROR_DATASET_URI); + } + config.put(DatasetSinkConstants.CONFIG_FAILURE_POLICY, + DatasetSinkConstants.SAVE_FAILURE_POLICY); + config.put(DatasetSinkConstants.CONFIG_KITE_ERROR_DATASET_URI, + ERROR_DATASET_URI); + DatasetSink sink = sink(in, config); + + // run the sink + sink.start(); + sink.process(); + sink.stop(); + + Assert.assertEquals( + Sets.newHashSet(expected), + read(Datasets.load(FILE_DATASET_URI))); + Assert.assertEquals("Should have committed", 0, remaining(in)); + } + + @Test + public void testMissingSchemaWithSavePolicy() throws EventDeliveryException { + if (Datasets.exists(ERROR_DATASET_URI)) { + Datasets.delete(ERROR_DATASET_URI); + } + config.put(DatasetSinkConstants.CONFIG_FAILURE_POLICY, + DatasetSinkConstants.SAVE_FAILURE_POLICY); + config.put(DatasetSinkConstants.CONFIG_KITE_ERROR_DATASET_URI, + ERROR_DATASET_URI); + final DatasetSink sink = sink(in, config); + + Event badEvent = new SimpleEvent(); + badEvent.setHeaders(Maps.<String, String>newHashMap()); + badEvent.setBody(serialize(expected.get(0), RECORD_SCHEMA)); + putToChannel(in, badEvent); + + // run the sink + sink.start(); + sink.process(); + sink.stop(); + + Assert.assertEquals("Good records should have been written", + Sets.newHashSet(expected), + read(Datasets.load(FILE_DATASET_URI))); + Assert.assertEquals("Should not have rolled back", 0, remaining(in)); + Assert.assertEquals("Should have saved the bad event", + Sets.newHashSet(AvroFlumeEvent.newBuilder() + .setBody(ByteBuffer.wrap(badEvent.getBody())) + .setHeaders(toUtf8Map(badEvent.getHeaders())) + .build()), + read(Datasets.load(ERROR_DATASET_URI, AvroFlumeEvent.class))); + } + + @Test + public void testSerializedWithIncompatibleSchemasWithSavePolicy() + throws EventDeliveryException { + if (Datasets.exists(ERROR_DATASET_URI)) { + Datasets.delete(ERROR_DATASET_URI); + } + config.put(DatasetSinkConstants.CONFIG_FAILURE_POLICY, + DatasetSinkConstants.SAVE_FAILURE_POLICY); + config.put(DatasetSinkConstants.CONFIG_KITE_ERROR_DATASET_URI, + ERROR_DATASET_URI); + final DatasetSink sink = sink(in, config); + + GenericRecordBuilder builder = new GenericRecordBuilder( + INCOMPATIBLE_SCHEMA); + GenericData.Record rec = builder.set("username", "koala").build(); + + // We pass in a valid schema in the header, but an incompatible schema + // was used to serialize the record + Event badEvent = event(rec, INCOMPATIBLE_SCHEMA, SCHEMA_FILE, true); + putToChannel(in, badEvent); + + // run the sink + sink.start(); + sink.process(); + sink.stop(); + + Assert.assertEquals("Good records should have been written", + Sets.newHashSet(expected), + read(Datasets.load(FILE_DATASET_URI))); + Assert.assertEquals("Should not have rolled back", 0, remaining(in)); + Assert.assertEquals("Should have saved the bad event", + Sets.newHashSet(AvroFlumeEvent.newBuilder() + .setBody(ByteBuffer.wrap(badEvent.getBody())) + .setHeaders(toUtf8Map(badEvent.getHeaders())) + .build()), + read(Datasets.load(ERROR_DATASET_URI, AvroFlumeEvent.class))); + } + + @Test + public void testSerializedWithIncompatibleSchemas() throws EventDeliveryException { + final DatasetSink sink = sink(in, config); + + GenericRecordBuilder builder = new GenericRecordBuilder( + INCOMPATIBLE_SCHEMA); + GenericData.Record rec = builder.set("username", "koala").build(); + + // We pass in a valid schema in the header, but an incompatible schema + // was used to serialize the record + putToChannel(in, event(rec, INCOMPATIBLE_SCHEMA, SCHEMA_FILE, true)); + + // run the sink + sink.start(); + assertThrows("Should fail", EventDeliveryException.class, + new Callable() { + @Override + public Object call() throws EventDeliveryException { + sink.process(); + return null; + } + }); + sink.stop(); + + Assert.assertEquals("Should have rolled back", + expected.size() + 1, remaining(in)); + } + + @Test + public void testCommitOnBatch() throws EventDeliveryException { + DatasetSink sink = sink(in, config); + + // run the sink + sink.start(); + sink.process(); + + // the transaction should commit during the call to process + Assert.assertEquals("Should have committed", 0, remaining(in)); + // but the data won't be visible yet + Assert.assertEquals(0, + read(Datasets.load(FILE_DATASET_URI)).size()); + + sink.stop(); + + Assert.assertEquals( + Sets.newHashSet(expected), + read(Datasets.load(FILE_DATASET_URI))); + } + + @Test + public void testCommitOnBatchFalse() throws EventDeliveryException { + config.put(DatasetSinkConstants.CONFIG_FLUSHABLE_COMMIT_ON_BATCH, + Boolean.toString(false)); + config.put(DatasetSinkConstants.CONFIG_SYNCABLE_SYNC_ON_BATCH, + Boolean.toString(false)); + DatasetSink sink = sink(in, config); + + // run the sink + sink.start(); + sink.process(); + + // the transaction should not commit during the call to process + assertThrows("Transaction should still be open", IllegalStateException.class, + new Callable() { + @Override + public Object call() throws EventDeliveryException { + in.getTransaction().begin(); + return null; + } + }); + + // the data won't be visible + Assert.assertEquals(0, + read(Datasets.load(FILE_DATASET_URI)).size()); + + sink.stop(); + + Assert.assertEquals( + Sets.newHashSet(expected), + read(Datasets.load(FILE_DATASET_URI))); + // the transaction should commit during the call to stop + Assert.assertEquals("Should have committed", 0, remaining(in)); + } + + @Test + public void testCommitOnBatchFalseSyncOnBatchTrue() throws EventDeliveryException { + config.put(DatasetSinkConstants.CONFIG_FLUSHABLE_COMMIT_ON_BATCH, + Boolean.toString(false)); + config.put(DatasetSinkConstants.CONFIG_SYNCABLE_SYNC_ON_BATCH, + Boolean.toString(true)); + + try { + sink(in, config); + Assert.fail("Should have thrown IllegalArgumentException"); + } catch (IllegalArgumentException ex) { + // expected + } + } + + @Test + public void testCloseAndCreateWriter() throws EventDeliveryException { + config.put(DatasetSinkConstants.CONFIG_FLUSHABLE_COMMIT_ON_BATCH, + Boolean.toString(false)); + config.put(DatasetSinkConstants.CONFIG_SYNCABLE_SYNC_ON_BATCH, + Boolean.toString(false)); + DatasetSink sink = sink(in, config); + + // run the sink + sink.start(); + sink.process(); + + sink.closeWriter(); + sink.commitTransaction(); + sink.createWriter(); + + Assert.assertNotNull("Writer should not be null", sink.getWriter()); + Assert.assertEquals("Should have committed", 0, remaining(in)); + + sink.stop(); + + Assert.assertEquals( + Sets.newHashSet(expected), + read(Datasets.load(FILE_DATASET_URI))); + } + + @Test + public void testCloseWriter() throws EventDeliveryException { + config.put(DatasetSinkConstants.CONFIG_FLUSHABLE_COMMIT_ON_BATCH, + Boolean.toString(false)); + config.put(DatasetSinkConstants.CONFIG_SYNCABLE_SYNC_ON_BATCH, + Boolean.toString(false)); + DatasetSink sink = sink(in, config); + + // run the sink + sink.start(); + sink.process(); + + sink.closeWriter(); + sink.commitTransaction(); + + Assert.assertNull("Writer should be null", sink.getWriter()); + Assert.assertEquals("Should have committed", 0, remaining(in)); + + sink.stop(); + + Assert.assertEquals( + Sets.newHashSet(expected), + read(Datasets.load(FILE_DATASET_URI))); + } + + @Test + public void testCreateWriter() throws EventDeliveryException { + config.put(DatasetSinkConstants.CONFIG_FLUSHABLE_COMMIT_ON_BATCH, + Boolean.toString(false)); + config.put(DatasetSinkConstants.CONFIG_SYNCABLE_SYNC_ON_BATCH, + Boolean.toString(false)); + DatasetSink sink = sink(in, config); + + // run the sink + sink.start(); + sink.process(); + + sink.commitTransaction(); + sink.createWriter(); + Assert.assertNotNull("Writer should not be null", sink.getWriter()); + Assert.assertEquals("Should have committed", 0, remaining(in)); + + sink.stop(); + + Assert.assertEquals(0, read(Datasets.load(FILE_DATASET_URI)).size()); + } + + @Test + public void testAppendWriteExceptionInvokesPolicy() + throws EventDeliveryException, NonRecoverableEventException { + DatasetSink sink = sink(in, config); + + // run the sink + sink.start(); + sink.process(); + + // Mock an Event + Event mockEvent = mock(Event.class); + when(mockEvent.getBody()).thenReturn(new byte[] { 0x01 }); + + // Mock a GenericRecord + GenericRecord mockRecord = mock(GenericRecord.class); + + // Mock an EntityParser + EntityParser<GenericRecord> mockParser = mock(EntityParser.class); + when(mockParser.parse(eq(mockEvent), any(GenericRecord.class))) + .thenReturn(mockRecord); + sink.setParser(mockParser); + + // Mock a FailurePolicy + FailurePolicy mockFailurePolicy = mock(FailurePolicy.class); + sink.setFailurePolicy(mockFailurePolicy); + + // Mock a DatasetWriter + DatasetWriter<GenericRecord> mockWriter = mock(DatasetWriter.class); + doThrow(new DataFileWriter.AppendWriteException(new IOException())) + .when(mockWriter).write(mockRecord); + + sink.setWriter(mockWriter); + sink.write(mockEvent); + + // Verify that the event was sent to the failure policy + verify(mockFailurePolicy).handle(eq(mockEvent), any(Throwable.class)); + + sink.stop(); + } + + @Test + public void testRuntimeExceptionThrowsEventDeliveryException() + throws EventDeliveryException, NonRecoverableEventException { + DatasetSink sink = sink(in, config); + + // run the sink + sink.start(); + sink.process(); + + // Mock an Event + Event mockEvent = mock(Event.class); + when(mockEvent.getBody()).thenReturn(new byte[] { 0x01 }); + + // Mock a GenericRecord + GenericRecord mockRecord = mock(GenericRecord.class); + + // Mock an EntityParser + EntityParser<GenericRecord> mockParser = mock(EntityParser.class); + when(mockParser.parse(eq(mockEvent), any(GenericRecord.class))) + .thenReturn(mockRecord); + sink.setParser(mockParser); + + // Mock a FailurePolicy + FailurePolicy mockFailurePolicy = mock(FailurePolicy.class); + sink.setFailurePolicy(mockFailurePolicy); + + // Mock a DatasetWriter + DatasetWriter<GenericRecord> mockWriter = mock(DatasetWriter.class); + doThrow(new RuntimeException()).when(mockWriter).write(mockRecord); + + sink.setWriter(mockWriter); + + try { + sink.write(mockEvent); + Assert.fail("Should throw EventDeliveryException"); + } catch (EventDeliveryException ex) { + + } + + // Verify that the event was not sent to the failure policy + verify(mockFailurePolicy, never()).handle(eq(mockEvent), any(Throwable.class)); + + sink.stop(); + } + + @Test + public void testProcessHandlesNullWriter() throws EventDeliveryException, + NonRecoverableEventException, NonRecoverableEventException { + DatasetSink sink = sink(in, config); + + // run the sink + sink.start(); + sink.process(); + + // explicitly set the writer to null + sink.setWriter(null); + + // this should not throw an NPE + sink.process(); + + sink.stop(); + + Assert.assertEquals("Should have committed", 0, remaining(in)); + } + public static DatasetSink sink(Channel in, Context config) { DatasetSink sink = new DatasetSink(); sink.setChannel(in); @@ -621,4 +1015,19 @@ public class TestDatasetSink { Assert.assertEquals(message, expected, actual.getClass()); } } + + /** + * Helper function to convert a map of String to a map of Utf8. + * + * @param map A Map of String to String + * @return The same mappings converting the {@code String}s to {@link Utf8}s + */ + public static Map<CharSequence, CharSequence> toUtf8Map( + Map<String, String> map) { + Map<CharSequence, CharSequence> utf8Map = Maps.newHashMap(); + for (Map.Entry<String, String> entry : map.entrySet()) { + utf8Map.put(new Utf8(entry.getKey()), new Utf8(entry.getValue())); + } + return utf8Map; + } } http://git-wip-us.apache.org/repos/asf/flume/blob/914106c0/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 7d4f01a..1350fa4 100644 --- a/pom.xml +++ b/pom.xml @@ -51,7 +51,7 @@ limitations under the License. <hadoop2.version>2.4.0</hadoop2.version> <thrift.version>0.7.0</thrift.version> - <kite.version>0.15.0</kite.version> + <kite.version>0.17.1</kite.version> <hive.version>0.10.0</hive.version> </properties>
