Repository: flume Updated Branches: refs/heads/trunk 26444fd7a -> ef2b089ab
FLUME-2469. DatasetSink should load dataset when needed, not at startup. (Ryan Blue via Hari) Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/ef2b089a Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/ef2b089a Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/ef2b089a Branch: refs/heads/trunk Commit: ef2b089ab70e04e67b70c23416ef7051e992122b Parents: 26444fd Author: Hari Shreedharan <[email protected]> Authored: Wed Sep 24 23:58:50 2014 -0700 Committer: Hari Shreedharan <[email protected]> Committed: Wed Sep 24 23:58:50 2014 -0700 ---------------------------------------------------------------------- .../org/apache/flume/sink/kite/DatasetSink.java | 93 ++++++++++++-------- .../apache/flume/sink/kite/TestDatasetSink.java | 89 +++++++++++++++++++ 2 files changed, 147 insertions(+), 35 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/ef2b089a/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/DatasetSink.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/DatasetSink.java b/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/DatasetSink.java index 4cd3027..a05d776 100644 --- a/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/DatasetSink.java +++ b/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/DatasetSink.java @@ -52,8 +52,12 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.security.UserGroupInformation; import org.kitesdk.data.Dataset; +import org.kitesdk.data.DatasetDescriptor; +import org.kitesdk.data.DatasetException; import org.kitesdk.data.DatasetWriter; import org.kitesdk.data.Datasets; +import org.kitesdk.data.View; +import org.kitesdk.data.spi.Registration; import org.kitesdk.data.spi.URIBuilder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -71,12 +75,11 @@ public class DatasetSink extends AbstractSink implements Configurable { static Configuration conf = new Configuration(); - private String datasetURI = null; - private String repositoryURI = null; private String datasetName = null; private long batchSize = DatasetSinkConstants.DEFAULT_BATCH_SIZE; - private Dataset<GenericRecord> targetDataset = null; + private URI target = null; + private Schema targetSchema = null; private DatasetWriter<GenericRecord> writer = null; private UserGroupInformation login = null; private SinkCounter counter = null; @@ -98,7 +101,7 @@ public class DatasetSink extends AbstractSink implements Configurable { // must use the target dataset's schema for reading to ensure the // records are able to be stored using it return new GenericDatumReader<GenericRecord>( - schema, targetDataset.getDescriptor().getSchema()); + schema, targetSchema); } }); private static LoadingCache<String, Schema> schemasFromLiteral = CacheBuilder @@ -150,39 +153,23 @@ public class DatasetSink extends AbstractSink implements Configurable { this.login = KerberosUtil.proxyAs(effectiveUser, login); } - this.datasetURI = context.getString( - DatasetSinkConstants.CONFIG_KITE_DATASET_URI); + String datasetURI = context.getString( + DatasetSinkConstants.CONFIG_KITE_DATASET_URI); if (datasetURI != null) { - this.targetDataset = KerberosUtil.runPrivileged(login, - new PrivilegedExceptionAction<Dataset<GenericRecord>>() { - @Override - public Dataset<GenericRecord> run() { - return Datasets.load(datasetURI); - } - }); + this.target = URI.create(datasetURI); + this.datasetName = uriToName(target); } else { - this.repositoryURI = context.getString( - DatasetSinkConstants.CONFIG_KITE_REPO_URI); + String repositoryURI = context.getString( + DatasetSinkConstants.CONFIG_KITE_REPO_URI); Preconditions.checkNotNull(repositoryURI, "Repository URI is missing"); this.datasetName = context.getString( - DatasetSinkConstants.CONFIG_KITE_DATASET_NAME); + DatasetSinkConstants.CONFIG_KITE_DATASET_NAME); Preconditions.checkNotNull(datasetName, "Dataset name is missing"); - this.targetDataset = KerberosUtil.runPrivileged(login, - new PrivilegedExceptionAction<Dataset<GenericRecord>>() { - @Override - public Dataset<GenericRecord> run() { - return Datasets.load( - new URIBuilder(repositoryURI, datasetName).build()); - } - }); + this.target = new URIBuilder(repositoryURI, datasetName).build(); } - String formatName = targetDataset.getDescriptor().getFormat().getName(); - Preconditions.checkArgument(allowedFormats().contains(formatName), - "Unsupported format: " + formatName); - - this.reuseDatum = !("parquet".equals(formatName)); + this.setName(target.toString()); // other configuration this.batchSize = context.getLong( @@ -192,12 +179,11 @@ public class DatasetSink extends AbstractSink implements Configurable { DatasetSinkConstants.CONFIG_KITE_ROLL_INTERVAL, DatasetSinkConstants.DEFAULT_ROLL_INTERVAL); - this.counter = new SinkCounter(getName()); + this.counter = new SinkCounter(datasetName); } @Override public synchronized void start() { - this.writer = targetDataset.newWriter(); this.lastRolledMs = System.currentTimeMillis(); counter.start(); // signal that this sink is ready to process @@ -232,17 +218,22 @@ public class DatasetSink extends AbstractSink implements Configurable { @Override public Status process() throws EventDeliveryException { if (writer == null) { - throw new EventDeliveryException( - "Cannot recover after previous failure"); + try { + this.writer = newWriter(login, target); + } catch (DatasetException e) { + // DatasetException includes DatasetNotFoundException + throw new EventDeliveryException( + "Cannot write to " + getName(), e); + } } // handle file rolling if ((System.currentTimeMillis() - lastRolledMs) / 1000 > rollIntervalS) { // close the current writer and get a new one writer.close(); - this.writer = targetDataset.newWriter(); + this.writer = newWriter(login, target); this.lastRolledMs = System.currentTimeMillis(); - LOG.info("Rolled writer for dataset: " + datasetName); + LOG.info("Rolled writer for " + getName()); } Channel channel = getChannel(); @@ -316,6 +307,34 @@ public class DatasetSink extends AbstractSink implements Configurable { } } + private DatasetWriter<GenericRecord> newWriter( + final UserGroupInformation login, final URI uri) { + View<GenericRecord> view = KerberosUtil.runPrivileged(login, + new PrivilegedExceptionAction<Dataset<GenericRecord>>() { + @Override + public Dataset<GenericRecord> run() { + return Datasets.load(uri); + } + }); + + DatasetDescriptor descriptor = view.getDataset().getDescriptor(); + String formatName = descriptor.getFormat().getName(); + Preconditions.checkArgument(allowedFormats().contains(formatName), + "Unsupported format: " + formatName); + + Schema newSchema = descriptor.getSchema(); + if (targetSchema == null || !newSchema.equals(targetSchema)) { + this.targetSchema = descriptor.getSchema(); + // target dataset schema has changed, invalidate all readers based on it + readers.invalidateAll(); + } + + this.reuseDatum = !("parquet".equals(formatName)); + this.datasetName = view.getDataset().getName(); + + return view.newWriter(); + } + /** * Not thread-safe. * @@ -351,4 +370,8 @@ public class DatasetSink extends AbstractSink implements Configurable { } } + private static String uriToName(URI uri) { + return Registration.lookupDatasetUri(URI.create( + uri.getRawSchemeSpecificPart())).second().get("dataset"); + } } http://git-wip-us.apache.org/repos/asf/flume/blob/ef2b089a/flume-ng-sinks/flume-dataset-sink/src/test/java/org/apache/flume/sink/kite/TestDatasetSink.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-dataset-sink/src/test/java/org/apache/flume/sink/kite/TestDatasetSink.java b/flume-ng-sinks/flume-dataset-sink/src/test/java/org/apache/flume/sink/kite/TestDatasetSink.java index a277381..c46d66c 100644 --- a/flume-ng-sinks/flume-dataset-sink/src/test/java/org/apache/flume/sink/kite/TestDatasetSink.java +++ b/flume-ng-sinks/flume-dataset-sink/src/test/java/org/apache/flume/sink/kite/TestDatasetSink.java @@ -33,6 +33,7 @@ import java.util.Arrays; import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.Callable; import javax.annotation.Nullable; import org.apache.avro.Schema; @@ -85,6 +86,12 @@ public class TestDatasetSink { public static final Schema INCOMPATIBLE_SCHEMA = new Schema.Parser().parse( "{\"type\":\"record\",\"name\":\"user\",\"fields\":[" + "{\"name\":\"username\",\"type\":\"string\"}]}"); + public static final Schema UPDATED_SCHEMA = new Schema.Parser().parse( + "{\"type\":\"record\",\"name\":\"rec\",\"fields\":[" + + "{\"name\":\"id\",\"type\":\"string\"}," + + "{\"name\":\"priority\",\"type\":\"int\", \"default\": 0}," + + "{\"name\":\"msg\",\"type\":[\"string\",\"null\"]," + + "\"default\":\"default\"}]}"); public static final DatasetDescriptor DESCRIPTOR = new DatasetDescriptor .Builder() .schema(RECORD_SCHEMA) @@ -252,6 +259,88 @@ public class TestDatasetSink { } @Test + public void testStartBeforeDatasetCreated() throws EventDeliveryException { + // delete the dataset created by setup + Datasets.delete(FILE_DATASET_URI); + + DatasetSink sink = sink(in, config); + + // start the sink + sink.start(); + + // run the sink without a target dataset + try { + sink.process(); + Assert.fail("Should have thrown an exception: no such dataset"); + } catch (EventDeliveryException e) { + // expected + } + + // create the target dataset + Datasets.create(FILE_DATASET_URI, DESCRIPTOR); + + // run the sink + sink.process(); + sink.stop(); + + Assert.assertEquals( + Sets.newHashSet(expected), + read(Datasets.load(FILE_DATASET_URI))); + Assert.assertEquals("Should have committed", 0, remaining(in)); + } + + @Test + public void testDatasetUpdate() throws EventDeliveryException { + // add an updated record that is missing the msg field + GenericRecordBuilder updatedBuilder = new GenericRecordBuilder( + UPDATED_SCHEMA); + GenericData.Record updatedRecord = updatedBuilder + .set("id", "0") + .set("priority", 1) + .set("msg", "Priority 1 message!") + .build(); + + // make a set of the expected records with the new schema + Set<GenericRecord> expectedAsUpdated = Sets.newHashSet(); + for (GenericRecord record : expected) { + expectedAsUpdated.add(updatedBuilder + .clear("priority") + .set("id", record.get("id")) + .set("msg", record.get("msg")) + .build()); + } + expectedAsUpdated.add(updatedRecord); + + DatasetSink sink = sink(in, config); + + // run the sink + sink.start(); + sink.process(); + + // update the dataset's schema + DatasetDescriptor updated = new DatasetDescriptor + .Builder(Datasets.load(FILE_DATASET_URI).getDataset().getDescriptor()) + .schema(UPDATED_SCHEMA) + .build(); + Datasets.update(FILE_DATASET_URI, updated); + + // trigger a roll on the next process call to refresh the writer + sink.roll(); + + // add the record to the incoming channel and the expected list + putToChannel(in, event(updatedRecord, UPDATED_SCHEMA, null, false)); + + // process events with the updated schema + sink.process(); + sink.stop(); + + Assert.assertEquals( + expectedAsUpdated, + read(Datasets.load(FILE_DATASET_URI))); + Assert.assertEquals("Should have committed", 0, remaining(in)); + } + + @Test public void testMiniClusterStore() throws EventDeliveryException, IOException { // setup a minicluster
