Repository: flume
Updated Branches:
  refs/heads/flume-1.6 94d8ea40a -> 709534a65


FLUME-2469. DatasetSink should load dataset when needed, not at startup.

(Ryan Blue via Hari)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/709534a6
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/709534a6
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/709534a6

Branch: refs/heads/flume-1.6
Commit: 709534a6514b5fe86890058ecaf94a8b6a6ff44a
Parents: 94d8ea4
Author: Hari Shreedharan <[email protected]>
Authored: Wed Sep 24 23:58:50 2014 -0700
Committer: Hari Shreedharan <[email protected]>
Committed: Wed Sep 24 23:59:35 2014 -0700

----------------------------------------------------------------------
 .../org/apache/flume/sink/kite/DatasetSink.java | 93 ++++++++++++--------
 .../apache/flume/sink/kite/TestDatasetSink.java | 89 +++++++++++++++++++
 2 files changed, 147 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/709534a6/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/DatasetSink.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/DatasetSink.java
 
b/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/DatasetSink.java
index 4cd3027..a05d776 100644
--- 
a/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/DatasetSink.java
+++ 
b/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/DatasetSink.java
@@ -52,8 +52,12 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.kitesdk.data.Dataset;
+import org.kitesdk.data.DatasetDescriptor;
+import org.kitesdk.data.DatasetException;
 import org.kitesdk.data.DatasetWriter;
 import org.kitesdk.data.Datasets;
+import org.kitesdk.data.View;
+import org.kitesdk.data.spi.Registration;
 import org.kitesdk.data.spi.URIBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -71,12 +75,11 @@ public class DatasetSink extends AbstractSink implements 
Configurable {
 
   static Configuration conf = new Configuration();
 
-  private String datasetURI = null;
-  private String repositoryURI = null;
   private String datasetName = null;
   private long batchSize = DatasetSinkConstants.DEFAULT_BATCH_SIZE;
 
-  private Dataset<GenericRecord> targetDataset = null;
+  private URI target = null;
+  private Schema targetSchema = null;
   private DatasetWriter<GenericRecord> writer = null;
   private UserGroupInformation login = null;
   private SinkCounter counter = null;
@@ -98,7 +101,7 @@ public class DatasetSink extends AbstractSink implements 
Configurable {
           // must use the target dataset's schema for reading to ensure the
           // records are able to be stored using it
           return new GenericDatumReader<GenericRecord>(
-              schema, targetDataset.getDescriptor().getSchema());
+            schema, targetSchema);
         }
       });
   private static LoadingCache<String, Schema> schemasFromLiteral = CacheBuilder
@@ -150,39 +153,23 @@ public class DatasetSink extends AbstractSink implements 
Configurable {
       this.login = KerberosUtil.proxyAs(effectiveUser, login);
     }
 
-    this.datasetURI = context.getString(
-        DatasetSinkConstants.CONFIG_KITE_DATASET_URI);
+    String datasetURI = context.getString(
+      DatasetSinkConstants.CONFIG_KITE_DATASET_URI);
     if (datasetURI != null) {
-      this.targetDataset = KerberosUtil.runPrivileged(login,
-          new PrivilegedExceptionAction<Dataset<GenericRecord>>() {
-            @Override
-            public Dataset<GenericRecord> run() {
-              return Datasets.load(datasetURI);
-            }
-          });
+      this.target = URI.create(datasetURI);
+      this.datasetName = uriToName(target);
     } else {
-      this.repositoryURI = context.getString(
-          DatasetSinkConstants.CONFIG_KITE_REPO_URI);
+      String repositoryURI = context.getString(
+        DatasetSinkConstants.CONFIG_KITE_REPO_URI);
       Preconditions.checkNotNull(repositoryURI, "Repository URI is missing");
       this.datasetName = context.getString(
-          DatasetSinkConstants.CONFIG_KITE_DATASET_NAME);
+        DatasetSinkConstants.CONFIG_KITE_DATASET_NAME);
       Preconditions.checkNotNull(datasetName, "Dataset name is missing");
 
-      this.targetDataset = KerberosUtil.runPrivileged(login,
-          new PrivilegedExceptionAction<Dataset<GenericRecord>>() {
-            @Override
-            public Dataset<GenericRecord> run() {
-              return Datasets.load(
-                  new URIBuilder(repositoryURI, datasetName).build());
-            }
-          });
+      this.target = new URIBuilder(repositoryURI, datasetName).build();
     }
 
-    String formatName = targetDataset.getDescriptor().getFormat().getName();
-    Preconditions.checkArgument(allowedFormats().contains(formatName),
-        "Unsupported format: " + formatName);
-
-    this.reuseDatum = !("parquet".equals(formatName));
+    this.setName(target.toString());
 
     // other configuration
     this.batchSize = context.getLong(
@@ -192,12 +179,11 @@ public class DatasetSink extends AbstractSink implements 
Configurable {
         DatasetSinkConstants.CONFIG_KITE_ROLL_INTERVAL,
         DatasetSinkConstants.DEFAULT_ROLL_INTERVAL);
 
-    this.counter = new SinkCounter(getName());
+    this.counter = new SinkCounter(datasetName);
   }
 
   @Override
   public synchronized void start() {
-    this.writer = targetDataset.newWriter();
     this.lastRolledMs = System.currentTimeMillis();
     counter.start();
     // signal that this sink is ready to process
@@ -232,17 +218,22 @@ public class DatasetSink extends AbstractSink implements 
Configurable {
   @Override
   public Status process() throws EventDeliveryException {
     if (writer == null) {
-      throw new EventDeliveryException(
-          "Cannot recover after previous failure");
+      try {
+        this.writer = newWriter(login, target);
+      } catch (DatasetException e) {
+        // DatasetException includes DatasetNotFoundException
+        throw new EventDeliveryException(
+          "Cannot write to " + getName(), e);
+      }
     }
 
     // handle file rolling
     if ((System.currentTimeMillis() - lastRolledMs) / 1000 > rollIntervalS) {
       // close the current writer and get a new one
       writer.close();
-      this.writer = targetDataset.newWriter();
+      this.writer = newWriter(login, target);
       this.lastRolledMs = System.currentTimeMillis();
-      LOG.info("Rolled writer for dataset: " + datasetName);
+      LOG.info("Rolled writer for " + getName());
     }
 
     Channel channel = getChannel();
@@ -316,6 +307,34 @@ public class DatasetSink extends AbstractSink implements 
Configurable {
     }
   }
 
+  private DatasetWriter<GenericRecord> newWriter(
+    final UserGroupInformation login, final URI uri) {
+    View<GenericRecord> view = KerberosUtil.runPrivileged(login,
+      new PrivilegedExceptionAction<Dataset<GenericRecord>>() {
+        @Override
+        public Dataset<GenericRecord> run() {
+          return Datasets.load(uri);
+        }
+      });
+
+    DatasetDescriptor descriptor = view.getDataset().getDescriptor();
+    String formatName = descriptor.getFormat().getName();
+    Preconditions.checkArgument(allowedFormats().contains(formatName),
+      "Unsupported format: " + formatName);
+
+    Schema newSchema = descriptor.getSchema();
+    if (targetSchema == null || !newSchema.equals(targetSchema)) {
+      this.targetSchema = descriptor.getSchema();
+      // target dataset schema has changed, invalidate all readers based on it
+      readers.invalidateAll();
+    }
+
+    this.reuseDatum = !("parquet".equals(formatName));
+    this.datasetName = view.getDataset().getName();
+
+    return view.newWriter();
+  }
+
   /**
    * Not thread-safe.
    *
@@ -351,4 +370,8 @@ public class DatasetSink extends AbstractSink implements 
Configurable {
     }
   }
 
+  private static String uriToName(URI uri) {
+    return Registration.lookupDatasetUri(URI.create(
+      uri.getRawSchemeSpecificPart())).second().get("dataset");
+  }
 }

http://git-wip-us.apache.org/repos/asf/flume/blob/709534a6/flume-ng-sinks/flume-dataset-sink/src/test/java/org/apache/flume/sink/kite/TestDatasetSink.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-sinks/flume-dataset-sink/src/test/java/org/apache/flume/sink/kite/TestDatasetSink.java
 
b/flume-ng-sinks/flume-dataset-sink/src/test/java/org/apache/flume/sink/kite/TestDatasetSink.java
index a277381..c46d66c 100644
--- 
a/flume-ng-sinks/flume-dataset-sink/src/test/java/org/apache/flume/sink/kite/TestDatasetSink.java
+++ 
b/flume-ng-sinks/flume-dataset-sink/src/test/java/org/apache/flume/sink/kite/TestDatasetSink.java
@@ -33,6 +33,7 @@ import java.util.Arrays;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.Callable;
 import javax.annotation.Nullable;
 import org.apache.avro.Schema;
@@ -85,6 +86,12 @@ public class TestDatasetSink {
   public static final Schema INCOMPATIBLE_SCHEMA = new Schema.Parser().parse(
       "{\"type\":\"record\",\"name\":\"user\",\"fields\":[" +
           "{\"name\":\"username\",\"type\":\"string\"}]}");
+  public static final Schema UPDATED_SCHEMA = new Schema.Parser().parse(
+      "{\"type\":\"record\",\"name\":\"rec\",\"fields\":[" +
+          "{\"name\":\"id\",\"type\":\"string\"}," +
+          "{\"name\":\"priority\",\"type\":\"int\", \"default\": 0}," +
+          "{\"name\":\"msg\",\"type\":[\"string\",\"null\"]," +
+          "\"default\":\"default\"}]}");
   public static final DatasetDescriptor DESCRIPTOR = new DatasetDescriptor
       .Builder()
       .schema(RECORD_SCHEMA)
@@ -252,6 +259,88 @@ public class TestDatasetSink {
   }
 
   @Test
+  public void testStartBeforeDatasetCreated() throws EventDeliveryException {
+    // delete the dataset created by setup
+    Datasets.delete(FILE_DATASET_URI);
+
+    DatasetSink sink = sink(in, config);
+
+    // start the sink
+    sink.start();
+
+    // run the sink without a target dataset
+    try {
+      sink.process();
+      Assert.fail("Should have thrown an exception: no such dataset");
+    } catch (EventDeliveryException e) {
+      // expected
+    }
+
+    // create the target dataset
+    Datasets.create(FILE_DATASET_URI, DESCRIPTOR);
+
+    // run the sink
+    sink.process();
+    sink.stop();
+
+    Assert.assertEquals(
+      Sets.newHashSet(expected),
+      read(Datasets.load(FILE_DATASET_URI)));
+    Assert.assertEquals("Should have committed", 0, remaining(in));
+  }
+
+  @Test
+  public void testDatasetUpdate() throws EventDeliveryException {
+    // add an updated record that is missing the msg field
+    GenericRecordBuilder updatedBuilder = new GenericRecordBuilder(
+      UPDATED_SCHEMA);
+    GenericData.Record updatedRecord = updatedBuilder
+      .set("id", "0")
+      .set("priority", 1)
+      .set("msg", "Priority 1 message!")
+      .build();
+
+    // make a set of the expected records with the new schema
+    Set<GenericRecord> expectedAsUpdated = Sets.newHashSet();
+    for (GenericRecord record : expected) {
+      expectedAsUpdated.add(updatedBuilder
+        .clear("priority")
+        .set("id", record.get("id"))
+        .set("msg", record.get("msg"))
+        .build());
+    }
+    expectedAsUpdated.add(updatedRecord);
+
+    DatasetSink sink = sink(in, config);
+
+    // run the sink
+    sink.start();
+    sink.process();
+
+    // update the dataset's schema
+    DatasetDescriptor updated = new DatasetDescriptor
+      .Builder(Datasets.load(FILE_DATASET_URI).getDataset().getDescriptor())
+      .schema(UPDATED_SCHEMA)
+      .build();
+    Datasets.update(FILE_DATASET_URI, updated);
+
+    // trigger a roll on the next process call to refresh the writer
+    sink.roll();
+
+    // add the record to the incoming channel and the expected list
+    putToChannel(in, event(updatedRecord, UPDATED_SCHEMA, null, false));
+
+    // process events with the updated schema
+    sink.process();
+    sink.stop();
+
+    Assert.assertEquals(
+      expectedAsUpdated,
+      read(Datasets.load(FILE_DATASET_URI)));
+    Assert.assertEquals("Should have committed", 0, remaining(in));
+  }
+
+  @Test
   public void testMiniClusterStore()
       throws EventDeliveryException, IOException {
     // setup a minicluster

Reply via email to