Repository: flume Updated Branches: refs/heads/trunk 0c5b87a5d -> 754183844
FLUME-2439. Upgrade Dataset sink to use Kite 1.5.0 (Ryan Blue via Hari) Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/75418384 Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/75418384 Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/75418384 Branch: refs/heads/trunk Commit: 75418384440fdd566649875f0c6cd61f6b841a5a Parents: 0c5b87a Author: Hari Shreedharan <[email protected]> Authored: Tue Aug 12 16:13:24 2014 -0700 Committer: Hari Shreedharan <[email protected]> Committed: Tue Aug 12 16:14:13 2014 -0700 ---------------------------------------------------------------------- flume-ng-doc/sphinx/FlumeUserGuide.rst | 22 +++--- .../org/apache/flume/sink/kite/DatasetSink.java | 82 ++++++++++++-------- .../flume/sink/kite/DatasetSinkConstants.java | 5 ++ .../apache/flume/sink/kite/TestDatasetSink.java | 82 ++++++++++++++++++-- 4 files changed, 137 insertions(+), 54 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/75418384/flume-ng-doc/sphinx/FlumeUserGuide.rst ---------------------------------------------------------------------- diff --git a/flume-ng-doc/sphinx/FlumeUserGuide.rst b/flume-ng-doc/sphinx/FlumeUserGuide.rst index daf6e72..ed90022 100644 --- a/flume-ng-doc/sphinx/FlumeUserGuide.rst +++ b/flume-ng-doc/sphinx/FlumeUserGuide.rst @@ -2045,18 +2045,13 @@ Example for agent named a1: a1.sinks.k1.serializer = org.apache.flume.sink.elasticsearch.ElasticSearchDynamicSerializer a1.sinks.k1.channel = c1 -Kite Dataset Sink (experimental) -~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ - -.. warning:: - This source is experimental and may change between minor versions of Flume. - Use at your own risk. +Kite Dataset Sink +~~~~~~~~~~~~~~~~~ -Experimental sink that writes events to a `Kite Dataset <http://kitesdk.org/docs/current/kite-data/guide.html>`_. +Experimental sink that writes events to a `Kite Dataset <http://kitesdk.org/docs/current/guide/>`_. This sink will deserialize the body of each incoming event and store the -resulting record in a Kite Dataset. It determines target Dataset by opening a -repository URI, ``kite.repo.uri``, and loading a Dataset by name, -``kite.dataset.name``. +resulting record in a Kite Dataset. It determines target Dataset by loading a +dataset by URI. The only supported serialization is avro, and the record schema must be passed in the event headers, using either ``flume.avro.schema.literal`` with the JSON @@ -2075,8 +2070,11 @@ Property Name Default Description ======================= ======= =========================================================== **channel** -- **type** -- Must be org.apache.flume.sink.kite.DatasetSink -**kite.repo.uri** -- URI of the repository to open -**kite.dataset.name** -- Name of the Dataset where records will be written +**kite.dataset.uri** -- URI of the dataset to open +kite.repo.uri -- URI of the repository to open + (deprecated; use kite.dataset.uri instead) +kite.dataset.name -- Name of the Dataset where records will be written + (deprecated; use kite.dataset.uri instead) kite.batchSize 100 Number of records to process in each batch kite.rollInterval 30 Maximum wait time (seconds) before data files are released auth.kerberosPrincipal -- Kerberos user principal for secure authentication to HDFS http://git-wip-us.apache.org/repos/asf/flume/blob/75418384/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/DatasetSink.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/DatasetSink.java b/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/DatasetSink.java index ed1b8d0..8f3ae51 100644 --- a/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/DatasetSink.java +++ b/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/DatasetSink.java @@ -34,9 +34,11 @@ import java.util.List; import java.util.Map; import java.util.concurrent.ExecutionException; import org.apache.avro.Schema; +import org.apache.avro.generic.GenericDatumReader; +import org.apache.avro.generic.GenericRecord; import org.apache.avro.io.BinaryDecoder; +import org.apache.avro.io.DatumReader; import org.apache.avro.io.DecoderFactory; -import org.apache.avro.reflect.ReflectDatumReader; import org.apache.flume.Channel; import org.apache.flume.Context; import org.apache.flume.Event; @@ -52,6 +54,7 @@ import org.apache.hadoop.security.UserGroupInformation; import org.kitesdk.data.Dataset; import org.kitesdk.data.DatasetRepositories; import org.kitesdk.data.DatasetWriter; +import org.kitesdk.data.Datasets; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -68,12 +71,13 @@ public class DatasetSink extends AbstractSink implements Configurable { static Configuration conf = new Configuration(); + private String datasetURI = null; private String repositoryURI = null; private String datasetName = null; private long batchSize = DatasetSinkConstants.DEFAULT_BATCH_SIZE; - private Dataset<Object> targetDataset = null; - private DatasetWriter<Object> writer = null; + private Dataset<GenericRecord> targetDataset = null; + private DatasetWriter<GenericRecord> writer = null; private UserGroupInformation login = null; private SinkCounter counter = null; @@ -82,16 +86,18 @@ public class DatasetSink extends AbstractSink implements Configurable { private long lastRolledMs = 0l; // for working with avro serialized records - private Object datum = null; + private GenericRecord datum = null; + // TODO: remove this after PARQUET-62 is released + private boolean reuseDatum = true; private BinaryDecoder decoder = null; - private LoadingCache<Schema, ReflectDatumReader<Object>> readers = + private LoadingCache<Schema, DatumReader<GenericRecord>> readers = CacheBuilder.newBuilder() - .build(new CacheLoader<Schema, ReflectDatumReader<Object>>() { + .build(new CacheLoader<Schema, DatumReader<GenericRecord>>() { @Override - public ReflectDatumReader<Object> load(Schema schema) { + public DatumReader<GenericRecord> load(Schema schema) { // must use the target dataset's schema for reading to ensure the // records are able to be stored using it - return new ReflectDatumReader<Object>( + return new GenericDatumReader<GenericRecord>( schema, targetDataset.getDescriptor().getSchema()); } }); @@ -129,7 +135,7 @@ public class DatasetSink extends AbstractSink implements Configurable { }); protected List<String> allowedFormats() { - return Lists.newArrayList("avro"); + return Lists.newArrayList("avro", "parquet"); } @Override @@ -144,25 +150,39 @@ public class DatasetSink extends AbstractSink implements Configurable { this.login = KerberosUtil.proxyAs(effectiveUser, login); } - this.repositoryURI = context.getString( - DatasetSinkConstants.CONFIG_KITE_REPO_URI); - Preconditions.checkNotNull(repositoryURI, "Repository URI is missing"); - this.datasetName = context.getString( - DatasetSinkConstants.CONFIG_KITE_DATASET_NAME); - Preconditions.checkNotNull(datasetName, "Dataset name is missing"); - - this.targetDataset = KerberosUtil.runPrivileged(login, - new PrivilegedExceptionAction<Dataset<Object>>() { - @Override - public Dataset<Object> run() { - return DatasetRepositories.open(repositoryURI).load(datasetName); - } - }); + this.datasetURI = context.getString( + DatasetSinkConstants.CONFIG_KITE_DATASET_URI); + if (datasetURI != null) { + this.targetDataset = KerberosUtil.runPrivileged(login, + new PrivilegedExceptionAction<Dataset<GenericRecord>>() { + @Override + public Dataset<GenericRecord> run() { + return Datasets.load(datasetURI); + } + }); + } else { + this.repositoryURI = context.getString( + DatasetSinkConstants.CONFIG_KITE_REPO_URI); + Preconditions.checkNotNull(repositoryURI, "Repository URI is missing"); + this.datasetName = context.getString( + DatasetSinkConstants.CONFIG_KITE_DATASET_NAME); + Preconditions.checkNotNull(datasetName, "Dataset name is missing"); + + this.targetDataset = KerberosUtil.runPrivileged(login, + new PrivilegedExceptionAction<Dataset<GenericRecord>>() { + @Override + public Dataset<GenericRecord> run() { + return DatasetRepositories.open(repositoryURI).load(datasetName); + } + }); + } String formatName = targetDataset.getDescriptor().getFormat().getName(); Preconditions.checkArgument(allowedFormats().contains(formatName), "Unsupported format: " + formatName); + this.reuseDatum = !("parquet".equals(formatName)); + // other configuration this.batchSize = context.getLong( DatasetSinkConstants.CONFIG_KITE_BATCH_SIZE, @@ -176,7 +196,7 @@ public class DatasetSink extends AbstractSink implements Configurable { @Override public synchronized void start() { - this.writer = openWriter(targetDataset); + this.writer = targetDataset.newWriter(); this.lastRolledMs = System.currentTimeMillis(); counter.start(); // signal that this sink is ready to process @@ -219,7 +239,7 @@ public class DatasetSink extends AbstractSink implements Configurable { if ((System.currentTimeMillis() - lastRolledMs) / 1000 > rollIntervalS) { // close the current writer and get a new one writer.close(); - this.writer = openWriter(targetDataset); + this.writer = targetDataset.newWriter(); this.lastRolledMs = System.currentTimeMillis(); LOG.info("Rolled writer for dataset: " + datasetName); } @@ -238,7 +258,7 @@ public class DatasetSink extends AbstractSink implements Configurable { break; } - this.datum = deserialize(event, datum); + this.datum = deserialize(event, reuseDatum ? datum : null); // writeEncoded would be an optimization in some cases, but HBase // will not support it and partitioned Datasets need to get partition @@ -302,11 +322,11 @@ public class DatasetSink extends AbstractSink implements Configurable { * @param reuse * @return */ - private Object deserialize(Event event, Object reuse) + private GenericRecord deserialize(Event event, GenericRecord reuse) throws EventDeliveryException { decoder = DecoderFactory.get().binaryDecoder(event.getBody(), decoder); // no checked exception is thrown in the CacheLoader - ReflectDatumReader<Object> reader = readers.getUnchecked(schema(event)); + DatumReader<GenericRecord> reader = readers.getUnchecked(schema(event)); try { return reader.read(reuse, decoder); } catch (IOException ex) { @@ -330,10 +350,4 @@ public class DatasetSink extends AbstractSink implements Configurable { } } - private static DatasetWriter<Object> openWriter(Dataset<Object> target) { - DatasetWriter<Object> writer = target.newWriter(); - writer.open(); - return writer; - } - } http://git-wip-us.apache.org/repos/asf/flume/blob/75418384/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/DatasetSinkConstants.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/DatasetSinkConstants.java b/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/DatasetSinkConstants.java index 09dfab6..3c67738 100644 --- a/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/DatasetSinkConstants.java +++ b/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/DatasetSinkConstants.java @@ -22,6 +22,11 @@ public class DatasetSinkConstants { /** * URI of the Kite DatasetRepository. */ + public static final String CONFIG_KITE_DATASET_URI = "kite.dataset.uri"; + + /** + * URI of the Kite DatasetRepository. + */ public static final String CONFIG_KITE_REPO_URI = "kite.repo.uri"; /** http://git-wip-us.apache.org/repos/asf/flume/blob/75418384/flume-ng-sinks/flume-dataset-sink/src/test/java/org/apache/flume/sink/kite/TestDatasetSink.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-dataset-sink/src/test/java/org/apache/flume/sink/kite/TestDatasetSink.java b/flume-ng-sinks/flume-dataset-sink/src/test/java/org/apache/flume/sink/kite/TestDatasetSink.java index 51dd408..b448b50 100644 --- a/flume-ng-sinks/flume-dataset-sink/src/test/java/org/apache/flume/sink/kite/TestDatasetSink.java +++ b/flume-ng-sinks/flume-dataset-sink/src/test/java/org/apache/flume/sink/kite/TestDatasetSink.java @@ -36,6 +36,7 @@ import java.util.concurrent.Callable; import javax.annotation.Nullable; import org.apache.avro.Schema; import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericRecord; import org.apache.avro.generic.GenericRecordBuilder; import org.apache.avro.io.Encoder; import org.apache.avro.io.EncoderFactory; @@ -63,12 +64,15 @@ import org.kitesdk.data.DatasetDescriptor; import org.kitesdk.data.DatasetReader; import org.kitesdk.data.DatasetRepositories; import org.kitesdk.data.DatasetRepository; +import org.kitesdk.data.Datasets; import org.kitesdk.data.PartitionStrategy; public class TestDatasetSink { public static final String FILE_REPO_URI = "repo:file:target/test-repo"; public static final String DATASET_NAME = "test"; + public static final String FILE_DATASET_URI = + "dataset:file:target/test-repo/" + DATASET_NAME; public static final DatasetRepository REPO = DatasetRepositories .open(FILE_REPO_URI); public static final File SCHEMA_FILE = new File("target/record-schema.avsc"); @@ -114,15 +118,15 @@ public class TestDatasetSink { @Before public void setup() throws EventDeliveryException { + REPO.delete(DATASET_NAME); REPO.create(DATASET_NAME, DESCRIPTOR); this.config = new Context(); - config.put(DatasetSinkConstants.CONFIG_KITE_REPO_URI, FILE_REPO_URI); - config.put(DatasetSinkConstants.CONFIG_KITE_DATASET_NAME, DATASET_NAME); - this.in = new MemoryChannel(); Configurables.configure(in, config); + config.put(DatasetSinkConstants.CONFIG_KITE_DATASET_URI, FILE_DATASET_URI); + GenericRecordBuilder builder = new GenericRecordBuilder(RECORD_SCHEMA); expected = Lists.newArrayList( builder.set("id", "1").set("msg", "msg1").build(), @@ -148,6 +152,44 @@ public class TestDatasetSink { } @Test + public void testOldConfig() throws EventDeliveryException { + config.put(DatasetSinkConstants.CONFIG_KITE_DATASET_URI, null); + config.put(DatasetSinkConstants.CONFIG_KITE_REPO_URI, FILE_REPO_URI); + config.put(DatasetSinkConstants.CONFIG_KITE_DATASET_NAME, DATASET_NAME); + + DatasetSink sink = sink(in, config); + + // run the sink + sink.start(); + sink.process(); + sink.stop(); + + Assert.assertEquals( + Sets.newHashSet(expected), + read(REPO.<GenericData.Record>load(DATASET_NAME))); + Assert.assertEquals("Should have committed", 0, remaining(in)); + } + + @Test + public void testDatasetUriOverridesOldConfig() throws EventDeliveryException { + // CONFIG_KITE_DATASET_URI is still set, otherwise this will cause an error + config.put(DatasetSinkConstants.CONFIG_KITE_REPO_URI, "bad uri"); + config.put(DatasetSinkConstants.CONFIG_KITE_DATASET_NAME, ""); + + DatasetSink sink = sink(in, config); + + // run the sink + sink.start(); + sink.process(); + sink.stop(); + + Assert.assertEquals( + Sets.newHashSet(expected), + read(REPO.<GenericData.Record>load(DATASET_NAME))); + Assert.assertEquals("Should have committed", 0, remaining(in)); + } + + @Test public void testFileStore() throws EventDeliveryException { DatasetSink sink = sink(in, config); @@ -163,6 +205,26 @@ public class TestDatasetSink { } @Test + public void testParquetDataset() throws EventDeliveryException { + Datasets.delete(FILE_DATASET_URI); + Dataset<GenericData.Record> created = Datasets.create(FILE_DATASET_URI, + new DatasetDescriptor.Builder(DESCRIPTOR) + .format("parquet") + .build(), + GenericData.Record.class); + + DatasetSink sink = sink(in, config); + + // run the sink + sink.start(); + sink.process(); + sink.stop(); + + Assert.assertEquals(Sets.newHashSet(expected), read(created)); + Assert.assertEquals("Should have committed", 0, remaining(in)); + } + + @Test public void testPartitionedData() throws EventDeliveryException { REPO.create("partitioned", new DatasetDescriptor.Builder(DESCRIPTOR) .partitionStrategy(new PartitionStrategy.Builder() @@ -171,7 +233,8 @@ public class TestDatasetSink { .build()); try { - config.put(DatasetSinkConstants.CONFIG_KITE_DATASET_NAME, "partitioned"); + config.put(DatasetSinkConstants.CONFIG_KITE_DATASET_URI, + "dataset:file:target/test-repo/partitioned"); DatasetSink sink = sink(in, config); // run the sink @@ -208,7 +271,8 @@ public class TestDatasetSink { hdfsRepo.create(DATASET_NAME, DESCRIPTOR); // update the config to use the HDFS repository - config.put(DatasetSinkConstants.CONFIG_KITE_REPO_URI, repoURI); + config.put(DatasetSinkConstants.CONFIG_KITE_DATASET_URI, + "dataset:" + conf.get("fs.defaultFS") + "/tmp/repo/" + DATASET_NAME); DatasetSink sink = sink(in, config); @@ -367,12 +431,14 @@ public class TestDatasetSink { } public static <T> HashSet<T> read(Dataset<T> dataset) { - DatasetReader<T> reader = dataset.newReader(); + DatasetReader<T> reader = null; try { - reader.open(); + reader = dataset.newReader(); return Sets.newHashSet(reader.iterator()); } finally { - reader.close(); + if (reader != null) { + reader.close(); + } } }
