Repository: flume
Updated Branches:
  refs/heads/trunk 0c5b87a5d -> 754183844


FLUME-2439. Upgrade Dataset sink to use Kite 1.5.0

(Ryan Blue via Hari)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/75418384
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/75418384
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/75418384

Branch: refs/heads/trunk
Commit: 75418384440fdd566649875f0c6cd61f6b841a5a
Parents: 0c5b87a
Author: Hari Shreedharan <[email protected]>
Authored: Tue Aug 12 16:13:24 2014 -0700
Committer: Hari Shreedharan <[email protected]>
Committed: Tue Aug 12 16:14:13 2014 -0700

----------------------------------------------------------------------
 flume-ng-doc/sphinx/FlumeUserGuide.rst          | 22 +++---
 .../org/apache/flume/sink/kite/DatasetSink.java | 82 ++++++++++++--------
 .../flume/sink/kite/DatasetSinkConstants.java   |  5 ++
 .../apache/flume/sink/kite/TestDatasetSink.java | 82 ++++++++++++++++++--
 4 files changed, 137 insertions(+), 54 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/75418384/flume-ng-doc/sphinx/FlumeUserGuide.rst
----------------------------------------------------------------------
diff --git a/flume-ng-doc/sphinx/FlumeUserGuide.rst 
b/flume-ng-doc/sphinx/FlumeUserGuide.rst
index daf6e72..ed90022 100644
--- a/flume-ng-doc/sphinx/FlumeUserGuide.rst
+++ b/flume-ng-doc/sphinx/FlumeUserGuide.rst
@@ -2045,18 +2045,13 @@ Example for agent named a1:
   a1.sinks.k1.serializer = 
org.apache.flume.sink.elasticsearch.ElasticSearchDynamicSerializer
   a1.sinks.k1.channel = c1
 
-Kite Dataset Sink (experimental)
-~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
-
-.. warning::
-  This source is experimental and may change between minor versions of Flume.
-  Use at your own risk.
+Kite Dataset Sink
+~~~~~~~~~~~~~~~~~
 
-Experimental sink that writes events to a `Kite Dataset 
<http://kitesdk.org/docs/current/kite-data/guide.html>`_.
+Experimental sink that writes events to a `Kite Dataset 
<http://kitesdk.org/docs/current/guide/>`_.
 This sink will deserialize the body of each incoming event and store the
-resulting record in a Kite Dataset. It determines target Dataset by opening a
-repository URI, ``kite.repo.uri``, and loading a Dataset by name,
-``kite.dataset.name``.
+resulting record in a Kite Dataset. It determines target Dataset by loading a
+dataset by URI.
 
 The only supported serialization is avro, and the record schema must be passed
 in the event headers, using either ``flume.avro.schema.literal`` with the JSON
@@ -2075,8 +2070,11 @@ Property Name            Default  Description
 =======================  =======  
===========================================================
 **channel**              --
 **type**                 --       Must be 
org.apache.flume.sink.kite.DatasetSink
-**kite.repo.uri**        --       URI of the repository to open
-**kite.dataset.name**    --       Name of the Dataset where records will be 
written
+**kite.dataset.uri**     --       URI of the dataset to open
+kite.repo.uri            --       URI of the repository to open
+                                  (deprecated; use kite.dataset.uri instead)
+kite.dataset.name        --       Name of the Dataset where records will be 
written
+                                  (deprecated; use kite.dataset.uri instead)
 kite.batchSize           100      Number of records to process in each batch
 kite.rollInterval        30       Maximum wait time (seconds) before data 
files are released
 auth.kerberosPrincipal   --       Kerberos user principal for secure 
authentication to HDFS

http://git-wip-us.apache.org/repos/asf/flume/blob/75418384/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/DatasetSink.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/DatasetSink.java
 
b/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/DatasetSink.java
index ed1b8d0..8f3ae51 100644
--- 
a/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/DatasetSink.java
+++ 
b/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/DatasetSink.java
@@ -34,9 +34,11 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ExecutionException;
 import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.io.BinaryDecoder;
+import org.apache.avro.io.DatumReader;
 import org.apache.avro.io.DecoderFactory;
-import org.apache.avro.reflect.ReflectDatumReader;
 import org.apache.flume.Channel;
 import org.apache.flume.Context;
 import org.apache.flume.Event;
@@ -52,6 +54,7 @@ import org.apache.hadoop.security.UserGroupInformation;
 import org.kitesdk.data.Dataset;
 import org.kitesdk.data.DatasetRepositories;
 import org.kitesdk.data.DatasetWriter;
+import org.kitesdk.data.Datasets;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -68,12 +71,13 @@ public class DatasetSink extends AbstractSink implements 
Configurable {
 
   static Configuration conf = new Configuration();
 
+  private String datasetURI = null;
   private String repositoryURI = null;
   private String datasetName = null;
   private long batchSize = DatasetSinkConstants.DEFAULT_BATCH_SIZE;
 
-  private Dataset<Object> targetDataset = null;
-  private DatasetWriter<Object> writer = null;
+  private Dataset<GenericRecord> targetDataset = null;
+  private DatasetWriter<GenericRecord> writer = null;
   private UserGroupInformation login = null;
   private SinkCounter counter = null;
 
@@ -82,16 +86,18 @@ public class DatasetSink extends AbstractSink implements 
Configurable {
   private long lastRolledMs = 0l;
 
   // for working with avro serialized records
-  private Object datum = null;
+  private GenericRecord datum = null;
+  // TODO: remove this after PARQUET-62 is released
+  private boolean reuseDatum = true;
   private BinaryDecoder decoder = null;
-  private LoadingCache<Schema, ReflectDatumReader<Object>> readers =
+  private LoadingCache<Schema, DatumReader<GenericRecord>> readers =
       CacheBuilder.newBuilder()
-      .build(new CacheLoader<Schema, ReflectDatumReader<Object>>() {
+      .build(new CacheLoader<Schema, DatumReader<GenericRecord>>() {
         @Override
-        public ReflectDatumReader<Object> load(Schema schema) {
+        public DatumReader<GenericRecord> load(Schema schema) {
           // must use the target dataset's schema for reading to ensure the
           // records are able to be stored using it
-          return new ReflectDatumReader<Object>(
+          return new GenericDatumReader<GenericRecord>(
               schema, targetDataset.getDescriptor().getSchema());
         }
       });
@@ -129,7 +135,7 @@ public class DatasetSink extends AbstractSink implements 
Configurable {
       });
 
   protected List<String> allowedFormats() {
-    return Lists.newArrayList("avro");
+    return Lists.newArrayList("avro", "parquet");
   }
 
   @Override
@@ -144,25 +150,39 @@ public class DatasetSink extends AbstractSink implements 
Configurable {
       this.login = KerberosUtil.proxyAs(effectiveUser, login);
     }
 
-    this.repositoryURI = context.getString(
-        DatasetSinkConstants.CONFIG_KITE_REPO_URI);
-    Preconditions.checkNotNull(repositoryURI, "Repository URI is missing");
-    this.datasetName = context.getString(
-        DatasetSinkConstants.CONFIG_KITE_DATASET_NAME);
-    Preconditions.checkNotNull(datasetName, "Dataset name is missing");
-
-    this.targetDataset = KerberosUtil.runPrivileged(login,
-        new PrivilegedExceptionAction<Dataset<Object>>() {
-      @Override
-      public Dataset<Object> run() {
-        return DatasetRepositories.open(repositoryURI).load(datasetName);
-      }
-    });
+    this.datasetURI = context.getString(
+        DatasetSinkConstants.CONFIG_KITE_DATASET_URI);
+    if (datasetURI != null) {
+      this.targetDataset = KerberosUtil.runPrivileged(login,
+          new PrivilegedExceptionAction<Dataset<GenericRecord>>() {
+            @Override
+            public Dataset<GenericRecord> run() {
+              return Datasets.load(datasetURI);
+            }
+          });
+    } else {
+      this.repositoryURI = context.getString(
+          DatasetSinkConstants.CONFIG_KITE_REPO_URI);
+      Preconditions.checkNotNull(repositoryURI, "Repository URI is missing");
+      this.datasetName = context.getString(
+          DatasetSinkConstants.CONFIG_KITE_DATASET_NAME);
+      Preconditions.checkNotNull(datasetName, "Dataset name is missing");
+
+      this.targetDataset = KerberosUtil.runPrivileged(login,
+          new PrivilegedExceptionAction<Dataset<GenericRecord>>() {
+            @Override
+            public Dataset<GenericRecord> run() {
+              return DatasetRepositories.open(repositoryURI).load(datasetName);
+            }
+          });
+    }
 
     String formatName = targetDataset.getDescriptor().getFormat().getName();
     Preconditions.checkArgument(allowedFormats().contains(formatName),
         "Unsupported format: " + formatName);
 
+    this.reuseDatum = !("parquet".equals(formatName));
+
     // other configuration
     this.batchSize = context.getLong(
         DatasetSinkConstants.CONFIG_KITE_BATCH_SIZE,
@@ -176,7 +196,7 @@ public class DatasetSink extends AbstractSink implements 
Configurable {
 
   @Override
   public synchronized void start() {
-    this.writer = openWriter(targetDataset);
+    this.writer = targetDataset.newWriter();
     this.lastRolledMs = System.currentTimeMillis();
     counter.start();
     // signal that this sink is ready to process
@@ -219,7 +239,7 @@ public class DatasetSink extends AbstractSink implements 
Configurable {
     if ((System.currentTimeMillis() - lastRolledMs) / 1000 > rollIntervalS) {
       // close the current writer and get a new one
       writer.close();
-      this.writer = openWriter(targetDataset);
+      this.writer = targetDataset.newWriter();
       this.lastRolledMs = System.currentTimeMillis();
       LOG.info("Rolled writer for dataset: " + datasetName);
     }
@@ -238,7 +258,7 @@ public class DatasetSink extends AbstractSink implements 
Configurable {
           break;
         }
 
-        this.datum = deserialize(event, datum);
+        this.datum = deserialize(event, reuseDatum ? datum : null);
 
         // writeEncoded would be an optimization in some cases, but HBase
         // will not support it and partitioned Datasets need to get partition
@@ -302,11 +322,11 @@ public class DatasetSink extends AbstractSink implements 
Configurable {
    * @param reuse
    * @return
    */
-  private Object deserialize(Event event, Object reuse)
+  private GenericRecord deserialize(Event event, GenericRecord reuse)
       throws EventDeliveryException {
     decoder = DecoderFactory.get().binaryDecoder(event.getBody(), decoder);
     // no checked exception is thrown in the CacheLoader
-    ReflectDatumReader<Object> reader = readers.getUnchecked(schema(event));
+    DatumReader<GenericRecord> reader = readers.getUnchecked(schema(event));
     try {
       return reader.read(reuse, decoder);
     } catch (IOException ex) {
@@ -330,10 +350,4 @@ public class DatasetSink extends AbstractSink implements 
Configurable {
     }
   }
 
-  private static DatasetWriter<Object> openWriter(Dataset<Object> target) {
-    DatasetWriter<Object> writer = target.newWriter();
-    writer.open();
-    return writer;
-  }
-
 }

http://git-wip-us.apache.org/repos/asf/flume/blob/75418384/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/DatasetSinkConstants.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/DatasetSinkConstants.java
 
b/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/DatasetSinkConstants.java
index 09dfab6..3c67738 100644
--- 
a/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/DatasetSinkConstants.java
+++ 
b/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/DatasetSinkConstants.java
@@ -22,6 +22,11 @@ public class DatasetSinkConstants {
   /**
    * URI of the Kite DatasetRepository.
    */
+  public static final String CONFIG_KITE_DATASET_URI = "kite.dataset.uri";
+
+  /**
+   * URI of the Kite DatasetRepository.
+   */
   public static final String CONFIG_KITE_REPO_URI = "kite.repo.uri";
 
   /**

http://git-wip-us.apache.org/repos/asf/flume/blob/75418384/flume-ng-sinks/flume-dataset-sink/src/test/java/org/apache/flume/sink/kite/TestDatasetSink.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-sinks/flume-dataset-sink/src/test/java/org/apache/flume/sink/kite/TestDatasetSink.java
 
b/flume-ng-sinks/flume-dataset-sink/src/test/java/org/apache/flume/sink/kite/TestDatasetSink.java
index 51dd408..b448b50 100644
--- 
a/flume-ng-sinks/flume-dataset-sink/src/test/java/org/apache/flume/sink/kite/TestDatasetSink.java
+++ 
b/flume-ng-sinks/flume-dataset-sink/src/test/java/org/apache/flume/sink/kite/TestDatasetSink.java
@@ -36,6 +36,7 @@ import java.util.concurrent.Callable;
 import javax.annotation.Nullable;
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.generic.GenericRecordBuilder;
 import org.apache.avro.io.Encoder;
 import org.apache.avro.io.EncoderFactory;
@@ -63,12 +64,15 @@ import org.kitesdk.data.DatasetDescriptor;
 import org.kitesdk.data.DatasetReader;
 import org.kitesdk.data.DatasetRepositories;
 import org.kitesdk.data.DatasetRepository;
+import org.kitesdk.data.Datasets;
 import org.kitesdk.data.PartitionStrategy;
 
 public class TestDatasetSink {
 
   public static final String FILE_REPO_URI = "repo:file:target/test-repo";
   public static final String DATASET_NAME = "test";
+  public static final String FILE_DATASET_URI =
+      "dataset:file:target/test-repo/" + DATASET_NAME;
   public static final DatasetRepository REPO = DatasetRepositories
       .open(FILE_REPO_URI);
   public static final File SCHEMA_FILE = new File("target/record-schema.avsc");
@@ -114,15 +118,15 @@ public class TestDatasetSink {
 
   @Before
   public void setup() throws EventDeliveryException {
+    REPO.delete(DATASET_NAME);
     REPO.create(DATASET_NAME, DESCRIPTOR);
 
     this.config = new Context();
-    config.put(DatasetSinkConstants.CONFIG_KITE_REPO_URI, FILE_REPO_URI);
-    config.put(DatasetSinkConstants.CONFIG_KITE_DATASET_NAME, DATASET_NAME);
-
     this.in = new MemoryChannel();
     Configurables.configure(in, config);
 
+    config.put(DatasetSinkConstants.CONFIG_KITE_DATASET_URI, FILE_DATASET_URI);
+
     GenericRecordBuilder builder = new GenericRecordBuilder(RECORD_SCHEMA);
     expected = Lists.newArrayList(
         builder.set("id", "1").set("msg", "msg1").build(),
@@ -148,6 +152,44 @@ public class TestDatasetSink {
   }
 
   @Test
+  public void testOldConfig() throws EventDeliveryException {
+    config.put(DatasetSinkConstants.CONFIG_KITE_DATASET_URI, null);
+    config.put(DatasetSinkConstants.CONFIG_KITE_REPO_URI, FILE_REPO_URI);
+    config.put(DatasetSinkConstants.CONFIG_KITE_DATASET_NAME, DATASET_NAME);
+
+    DatasetSink sink = sink(in, config);
+
+    // run the sink
+    sink.start();
+    sink.process();
+    sink.stop();
+
+    Assert.assertEquals(
+        Sets.newHashSet(expected),
+        read(REPO.<GenericData.Record>load(DATASET_NAME)));
+    Assert.assertEquals("Should have committed", 0, remaining(in));
+  }
+
+  @Test
+  public void testDatasetUriOverridesOldConfig() throws EventDeliveryException 
{
+    // CONFIG_KITE_DATASET_URI is still set, otherwise this will cause an error
+    config.put(DatasetSinkConstants.CONFIG_KITE_REPO_URI, "bad uri");
+    config.put(DatasetSinkConstants.CONFIG_KITE_DATASET_NAME, "");
+
+    DatasetSink sink = sink(in, config);
+
+    // run the sink
+    sink.start();
+    sink.process();
+    sink.stop();
+
+    Assert.assertEquals(
+        Sets.newHashSet(expected),
+        read(REPO.<GenericData.Record>load(DATASET_NAME)));
+    Assert.assertEquals("Should have committed", 0, remaining(in));
+  }
+
+  @Test
   public void testFileStore() throws EventDeliveryException {
     DatasetSink sink = sink(in, config);
 
@@ -163,6 +205,26 @@ public class TestDatasetSink {
   }
 
   @Test
+  public void testParquetDataset() throws EventDeliveryException {
+    Datasets.delete(FILE_DATASET_URI);
+    Dataset<GenericData.Record> created = Datasets.create(FILE_DATASET_URI,
+        new DatasetDescriptor.Builder(DESCRIPTOR)
+            .format("parquet")
+            .build(),
+        GenericData.Record.class);
+
+    DatasetSink sink = sink(in, config);
+
+    // run the sink
+    sink.start();
+    sink.process();
+    sink.stop();
+
+    Assert.assertEquals(Sets.newHashSet(expected), read(created));
+    Assert.assertEquals("Should have committed", 0, remaining(in));
+  }
+
+  @Test
   public void testPartitionedData() throws EventDeliveryException {
     REPO.create("partitioned", new DatasetDescriptor.Builder(DESCRIPTOR)
         .partitionStrategy(new PartitionStrategy.Builder()
@@ -171,7 +233,8 @@ public class TestDatasetSink {
         .build());
 
     try {
-      config.put(DatasetSinkConstants.CONFIG_KITE_DATASET_NAME, "partitioned");
+      config.put(DatasetSinkConstants.CONFIG_KITE_DATASET_URI,
+          "dataset:file:target/test-repo/partitioned");
       DatasetSink sink = sink(in, config);
 
       // run the sink
@@ -208,7 +271,8 @@ public class TestDatasetSink {
       hdfsRepo.create(DATASET_NAME, DESCRIPTOR);
 
       // update the config to use the HDFS repository
-      config.put(DatasetSinkConstants.CONFIG_KITE_REPO_URI, repoURI);
+      config.put(DatasetSinkConstants.CONFIG_KITE_DATASET_URI,
+          "dataset:" + conf.get("fs.defaultFS") + "/tmp/repo/" + DATASET_NAME);
 
       DatasetSink sink = sink(in, config);
 
@@ -367,12 +431,14 @@ public class TestDatasetSink {
   }
 
   public static <T> HashSet<T> read(Dataset<T> dataset) {
-    DatasetReader<T> reader = dataset.newReader();
+    DatasetReader<T> reader = null;
     try {
-      reader.open();
+      reader = dataset.newReader();
       return Sets.newHashSet(reader.iterator());
     } finally {
-      reader.close();
+      if (reader != null) {
+        reader.close();
+      }
     }
   }
 

Reply via email to