This is an automated email from the ASF dual-hosted git repository.

rgoers pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/flume.git


The following commit(s) were added to refs/heads/trunk by this push:
     new a51d7a0  FLUME-3356 Upgrade Avro to 1.9.2
a51d7a0 is described below

commit a51d7a05e14381bcde73705d8c51df4bfd1919dc
Author: Ralph Goers <[email protected]>
AuthorDate: Wed Jan 12 16:10:23 2022 -0700

    FLUME-3356 Upgrade Avro to 1.9.2
---
 conf/log4j2.xml                                    |    2 +-
 flume-ng-core/pom.xml                              |    9 +-
 .../java/org/apache/flume/source/AvroSource.java   |    4 +-
 .../java/org/apache/flume/sink/TestAvroSink.java   |   46 +-
 .../org/apache/flume/source/TestAvroSource.java    |    9 +-
 .../flume/agent/embedded/TestEmbeddedAgent.java    |    8 +-
 flume-ng-legacy-sources/flume-avro-source/pom.xml  |   10 +-
 .../flume/source/avroLegacy/AvroLegacySource.java  |   12 +-
 flume-ng-sdk/pom.xml                               |   11 +-
 .../org/apache/flume/api/NettyAvroRpcClient.java   |   11 +-
 .../java/org/apache/flume/api/RpcTestUtils.java    |   50 +-
 flume-ng-sinks/flume-dataset-sink/pom.xml          |  132 ---
 .../org/apache/flume/sink/kite/DatasetSink.java    |  588 -----------
 .../flume/sink/kite/DatasetSinkConstants.java      |  132 ---
 .../sink/kite/NonRecoverableEventException.java    |   52 -
 .../apache/flume/sink/kite/parser/AvroParser.java  |  208 ----
 .../flume/sink/kite/parser/EntityParser.java       |   56 --
 .../sink/kite/parser/EntityParserFactory.java      |   81 --
 .../flume/sink/kite/policy/FailurePolicy.java      |  105 --
 .../sink/kite/policy/FailurePolicyFactory.java     |   81 --
 .../apache/flume/sink/kite/policy/RetryPolicy.java |   63 --
 .../apache/flume/sink/kite/policy/SavePolicy.java  |  128 ---
 .../apache/flume/sink/kite/TestDatasetSink.java    | 1036 --------------------
 .../src/test/resources/enable-kerberos.xml         |   30 -
 flume-ng-sinks/flume-ng-hbase-sink/pom.xml         |    5 -
 flume-ng-sinks/flume-ng-hbase2-sink/pom.xml        |    5 -
 .../org/apache/flume/sink/kafka/TestKafkaSink.java |   12 +-
 .../org/apache/flume/sink/kafka/util/TestUtil.java |   23 +-
 flume-ng-sinks/pom.xml                             |    1 -
 .../source/kafka/KafkaSourceEmbeddedKafka.java     |   23 +-
 pom.xml                                            |   69 +-
 31 files changed, 159 insertions(+), 2843 deletions(-)

diff --git a/conf/log4j2.xml b/conf/log4j2.xml
index d203a24..d3efac4 100644
--- a/conf/log4j2.xml
+++ b/conf/log4j2.xml
@@ -46,7 +46,7 @@
   <Loggers>
     <Logger name="org.apache.flume.lifecycle" level="info"/>
     <Logger name="org.jboss" level="WARN"/>
-    <Logger name="org.apache.avro.ipc.NettyTransceiver" level="WARN"/>
+    <Logger name="org.apache.avro.ipc.netty.NettyTransceiver" level="WARN"/>
     <Logger name="org.apache.hadoop" level="INFO"/>
     <Logger name="org.apache.hadoop.hive" level="ERROR"/>
     <Root level="INFO">
diff --git a/flume-ng-core/pom.xml b/flume-ng-core/pom.xml
index b2a90db..bd81c4b 100644
--- a/flume-ng-core/pom.xml
+++ b/flume-ng-core/pom.xml
@@ -32,7 +32,7 @@ limitations under the License.
   <properties>
     <!-- TODO fix spotbugs violations -->
     <spotbugs.maxAllowedViolations>115</spotbugs.maxAllowedViolations>
-    <pmd.maxAllowedViolations>112</pmd.maxAllowedViolations>
+    <pmd.maxAllowedViolations>121</pmd.maxAllowedViolations>
   </properties>
 
   <build>
@@ -324,12 +324,7 @@ limitations under the License.
 
     <dependency>
       <groupId>org.apache.avro</groupId>
-      <artifactId>avro-ipc</artifactId>
-    </dependency>
-
-    <dependency>
-      <groupId>io.netty</groupId>
-      <artifactId>netty</artifactId>
+      <artifactId>avro-ipc-netty</artifactId>
     </dependency>
 
     <dependency>
diff --git 
a/flume-ng-core/src/main/java/org/apache/flume/source/AvroSource.java 
b/flume-ng-core/src/main/java/org/apache/flume/source/AvroSource.java
index ac3d51d..6342aa9 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/source/AvroSource.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/source/AvroSource.java
@@ -20,8 +20,8 @@
 package org.apache.flume.source;
 
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import org.apache.avro.ipc.NettyServer;
-import org.apache.avro.ipc.NettyTransceiver;
+import org.apache.avro.ipc.netty.NettyServer;
+import org.apache.avro.ipc.netty.NettyTransceiver;
 import org.apache.avro.ipc.Responder;
 import org.apache.avro.ipc.Server;
 import org.apache.avro.ipc.specific.SpecificResponder;
diff --git 
a/flume-ng-core/src/test/java/org/apache/flume/sink/TestAvroSink.java 
b/flume-ng-core/src/test/java/org/apache/flume/sink/TestAvroSink.java
index cc2c91a..bb62cff 100644
--- a/flume-ng-core/src/test/java/org/apache/flume/sink/TestAvroSink.java
+++ b/flume-ng-core/src/test/java/org/apache/flume/sink/TestAvroSink.java
@@ -20,8 +20,7 @@
 package org.apache.flume.sink;
 
 import com.google.common.base.Charsets;
-import org.apache.avro.AvroRemoteException;
-import org.apache.avro.ipc.NettyServer;
+import org.apache.avro.ipc.netty.NettyServer;
 import org.apache.avro.ipc.Server;
 import org.apache.avro.ipc.specific.SpecificResponder;
 import org.apache.flume.Channel;
@@ -810,21 +809,20 @@ public class TestAvroSink {
   private static class MockAvroServer implements AvroSourceProtocol {
 
     @Override
-    public Status append(AvroFlumeEvent event) throws AvroRemoteException {
+    public Status append(AvroFlumeEvent event) {
       logger.debug("Received event:{}", event);
       return Status.OK;
     }
 
     @Override
-    public Status appendBatch(List<AvroFlumeEvent> events)
-        throws AvroRemoteException {
+    public Status appendBatch(List<AvroFlumeEvent> events) {
       logger.debug("Received event batch:{}", events);
       return Status.OK;
     }
 
   }
 
-  private static class DelayMockAvroServer implements AvroSourceProtocol {
+  private static class DelayMockAvroServer implements 
AvroSourceProtocol.Callback {
 
     private final AtomicLong delay;
 
@@ -832,29 +830,51 @@ public class TestAvroSink {
       this.delay = delay;
     }
 
-    private void sleep() throws AvroRemoteException {
+    private void sleep() throws IOException {
       try {
         Thread.sleep(delay.get());
       } catch (InterruptedException e) {
-        throw new AvroRemoteException("Interrupted while sleeping", e);
+        throw new IOException("Interrupted while sleeping", e);
       }
     }
 
     @Override
-    public Status append(AvroFlumeEvent event) throws AvroRemoteException {
+    public Status append(AvroFlumeEvent event) {
       logger.debug("Received event:{}; delaying for {}ms", event, delay);
-      sleep();
+      try {
+        sleep();
+      } catch (IOException e) {
+        logger.debug("IOException detected");
+      }
       return Status.OK;
     }
 
     @Override
-    public Status appendBatch(List<AvroFlumeEvent> events)
-        throws AvroRemoteException {
-      logger.debug("Received event batch:{}; delaying for {}ms", events, 
delay);
+    public void append(AvroFlumeEvent event,
+                   org.apache.avro.ipc.Callback<Status> status)
+            throws IOException {
+      logger.debug("Received event:{}; delaying for {}ms", event, delay);
       sleep();
+    }
+
+    @Override
+    public Status appendBatch(List<AvroFlumeEvent> events) {
+      logger.debug("Received event batch:{}; delaying for {}ms", events, 
delay);
+      try {
+        sleep();
+      } catch (IOException e) {
+        logger.debug("IOException detected");
+      }
       return Status.OK;
     }
 
+    @Override
+    public void appendBatch(List<AvroFlumeEvent> events,
+                         org.apache.avro.ipc.Callback<Status> status)
+            throws IOException {
+      logger.debug("Received event batch:{}; delaying for {}ms", events, 
delay);
+      sleep();
+    }
   }
 
   private Server createSslServer(AvroSourceProtocol protocol)
diff --git 
a/flume-ng-core/src/test/java/org/apache/flume/source/TestAvroSource.java 
b/flume-ng-core/src/test/java/org/apache/flume/source/TestAvroSource.java
index 21e65ad..b9aab53 100644
--- a/flume-ng-core/src/test/java/org/apache/flume/source/TestAvroSource.java
+++ b/flume-ng-core/src/test/java/org/apache/flume/source/TestAvroSource.java
@@ -39,7 +39,8 @@ import javax.net.ssl.SSLEngine;
 import javax.net.ssl.TrustManager;
 import javax.net.ssl.X509TrustManager;
 
-import org.apache.avro.ipc.NettyTransceiver;
+import org.apache.avro.AvroRuntimeException;
+import org.apache.avro.ipc.netty.NettyTransceiver;
 import org.apache.avro.ipc.specific.SpecificRequestor;
 import org.apache.flume.Channel;
 import org.apache.flume.ChannelException;
@@ -218,13 +219,13 @@ public class TestAvroSource {
     doRequest(true, true, 9);
   }
 
-  @Test(expected = org.apache.avro.AvroRemoteException.class)
+  @Test(expected = org.apache.avro.AvroRuntimeException.class)
   public void testRequestWithCompressionOnServerOnly() throws 
InterruptedException, IOException {
     //This will fail because both client and server need compression on
     doRequest(true, false, 6);
   }
 
-  @Test(expected = org.apache.avro.AvroRemoteException.class)
+  @Test(expected = org.apache.avro.AvroRuntimeException.class)
   public void testRequestWithCompressionOnClientOnly() throws 
InterruptedException, IOException {
     //This will fail because both client and server need compression on
     doRequest(false, true, 6);
@@ -585,7 +586,7 @@ public class TestAvroSource {
       Status status = client.append(avroEvent);
       logger.info("Client appended");
       Assert.assertEquals(Status.OK, status);
-    } catch (IOException e) {
+    } catch (IOException | AvroRuntimeException e) {
       Assert.assertTrue("Should have been allowed: " + ruleDefinition,
           !eventShouldBeAllowed);
       return;
diff --git 
a/flume-ng-embedded-agent/src/test/java/org/apache/flume/agent/embedded/TestEmbeddedAgent.java
 
b/flume-ng-embedded-agent/src/test/java/org/apache/flume/agent/embedded/TestEmbeddedAgent.java
index 5bb7cf1..68a3a07 100644
--- 
a/flume-ng-embedded-agent/src/test/java/org/apache/flume/agent/embedded/TestEmbeddedAgent.java
+++ 
b/flume-ng-embedded-agent/src/test/java/org/apache/flume/agent/embedded/TestEmbeddedAgent.java
@@ -28,8 +28,7 @@ import java.util.Queue;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.atomic.AtomicInteger;
 
-import org.apache.avro.AvroRemoteException;
-import org.apache.avro.ipc.NettyServer;
+import org.apache.avro.ipc.netty.NettyServer;
 import org.apache.avro.ipc.Responder;
 import org.apache.avro.ipc.specific.SpecificResponder;
 import org.apache.flume.Event;
@@ -204,13 +203,12 @@ public class TestEmbeddedAgent {
       return null;
     }
     @Override
-    public Status append(AvroFlumeEvent event) throws AvroRemoteException {
+    public Status append(AvroFlumeEvent event) {
       eventQueue.add(event);
       return Status.OK;
     }
     @Override
-    public Status appendBatch(List<AvroFlumeEvent> events)
-        throws AvroRemoteException {
+    public Status appendBatch(List<AvroFlumeEvent> events) {
       Preconditions.checkState(eventQueue.addAll(events));
       return Status.OK;
     }
diff --git a/flume-ng-legacy-sources/flume-avro-source/pom.xml 
b/flume-ng-legacy-sources/flume-avro-source/pom.xml
index b592744..9d9e9dc 100644
--- a/flume-ng-legacy-sources/flume-avro-source/pom.xml
+++ b/flume-ng-legacy-sources/flume-avro-source/pom.xml
@@ -32,7 +32,8 @@ limitations under the License.
 
   <properties>
     <!-- TODO fix spotbugs violations -->
-    <spotbugs.maxAllowedViolations>2</spotbugs.maxAllowedViolations>
+    <spotbugs.maxAllowedViolations>5</spotbugs.maxAllowedViolations>
+       <pmd.maxAllowedViolations>37</pmd.maxAllowedViolations>
     <thrift.executable>${env.THRIFT_HOME}/bin/thrift</thrift.executable>
   </properties>
 
@@ -131,7 +132,12 @@ limitations under the License.
 
     <dependency>
       <groupId>org.apache.avro</groupId>
-      <artifactId>avro-ipc</artifactId>
+      <artifactId>avro-ipc-netty</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.avro</groupId>
+      <artifactId>avro-ipc-jetty</artifactId>
     </dependency>
 
   </dependencies>
diff --git 
a/flume-ng-legacy-sources/flume-avro-source/src/main/java/org/apache/flume/source/avroLegacy/AvroLegacySource.java
 
b/flume-ng-legacy-sources/flume-avro-source/src/main/java/org/apache/flume/source/avroLegacy/AvroLegacySource.java
index dde8f28..5e87193 100644
--- 
a/flume-ng-legacy-sources/flume-avro-source/src/main/java/org/apache/flume/source/avroLegacy/AvroLegacySource.java
+++ 
b/flume-ng-legacy-sources/flume-avro-source/src/main/java/org/apache/flume/source/avroLegacy/AvroLegacySource.java
@@ -25,7 +25,7 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.Map.Entry;
 
-import org.apache.avro.ipc.HttpServer;
+import org.apache.avro.ipc.jetty.HttpServer;
 import org.apache.avro.ipc.specific.SpecificResponder;
 import org.apache.flume.Context;
 import org.apache.flume.CounterGroup;
@@ -38,7 +38,6 @@ import org.apache.flume.source.AbstractSource;
 
 import com.cloudera.flume.handlers.avro.AvroFlumeOGEvent;
 import com.cloudera.flume.handlers.avro.FlumeOGEventAvroServer;
-import org.apache.avro.AvroRemoteException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.flume.ChannelException;
@@ -122,15 +121,15 @@ public class AvroLegacySource extends AbstractSource 
implements
   }
 
   @Override
-  public Void append( AvroFlumeOGEvent evt ) throws AvroRemoteException {
+  public void append( AvroFlumeOGEvent evt ) {
     counterGroup.incrementAndGet("rpc.received");
     Map<String, String> headers = new HashMap<String, String>();
 
     // extract Flume OG event headers
     headers.put(HOST, evt.getHost().toString());
-    headers.put(TIMESTAMP, evt.getTimestamp().toString());
+    headers.put(TIMESTAMP, Long.toString(evt.getTimestamp()));
     headers.put(PRIORITY, evt.getPriority().toString());
-    headers.put(NANOS, evt.getNanos().toString());
+    headers.put(NANOS, Long.toString(evt.getNanos()));
     for (Entry<CharSequence, ByteBuffer> entry : evt.getFields().entrySet()) {
       headers.put(entry.getKey().toString(), entry.getValue().toString());
     }
@@ -141,11 +140,10 @@ public class AvroLegacySource extends AbstractSource 
implements
       getChannelProcessor().processEvent(event);
       counterGroup.incrementAndGet("rpc.events");
     } catch (ChannelException ex) {
-      return null;
+      return;
     }
 
     counterGroup.incrementAndGet("rpc.successful");
-    return null;
   }
 
   @Override
diff --git a/flume-ng-sdk/pom.xml b/flume-ng-sdk/pom.xml
index 0376684..85ff8a4 100644
--- a/flume-ng-sdk/pom.xml
+++ b/flume-ng-sdk/pom.xml
@@ -30,8 +30,8 @@ limitations under the License.
 
   <properties>
     <!-- TODO fix spotbugs/pmd violations -->
-    <spotbugs.maxAllowedViolations>67</spotbugs.maxAllowedViolations>
-    <pmd.maxAllowedViolations>131</pmd.maxAllowedViolations>
+    <spotbugs.maxAllowedViolations>69</spotbugs.maxAllowedViolations>
+    <pmd.maxAllowedViolations>170</pmd.maxAllowedViolations>
   </properties>
 
   <profiles>
@@ -227,7 +227,7 @@ limitations under the License.
 
     <dependency>
       <groupId>org.apache.avro</groupId>
-      <artifactId>avro-ipc</artifactId>
+      <artifactId>avro-ipc-netty</artifactId>
     </dependency>
 
     <dependency>
@@ -240,5 +240,10 @@ limitations under the License.
       <artifactId>libthrift</artifactId>
     </dependency>
 
+         <dependency>
+           <groupId>commons-lang</groupId>
+           <artifactId>commons-lang</artifactId>
+         </dependency>
+
   </dependencies>
 </project>
diff --git 
a/flume-ng-sdk/src/main/java/org/apache/flume/api/NettyAvroRpcClient.java 
b/flume-ng-sdk/src/main/java/org/apache/flume/api/NettyAvroRpcClient.java
index cf52f6e..06d364f 100644
--- a/flume-ng-sdk/src/main/java/org/apache/flume/api/NettyAvroRpcClient.java
+++ b/flume-ng-sdk/src/main/java/org/apache/flume/api/NettyAvroRpcClient.java
@@ -56,7 +56,7 @@ import javax.net.ssl.TrustManagerFactory;
 import javax.net.ssl.X509TrustManager;
 
 import org.apache.avro.ipc.CallFuture;
-import org.apache.avro.ipc.NettyTransceiver;
+import org.apache.avro.ipc.netty.NettyTransceiver;
 import org.apache.avro.ipc.Transceiver;
 import org.apache.avro.ipc.specific.SpecificRequestor;
 import org.apache.commons.lang.StringUtils;
@@ -704,10 +704,11 @@ public class NettyAvroRpcClient extends 
SSLContextAwareAbstractRpcClient {
             KeyStore keystore = null;
 
             if (truststore != null) {
-              InputStream truststoreStream = new FileInputStream(truststore);
-              keystore = KeyStore.getInstance(truststoreType);
-              keystore.load(truststoreStream,
-                  truststorePassword != null ? 
truststorePassword.toCharArray() : null);
+              try (InputStream truststoreStream = new 
FileInputStream(truststore)) {
+                keystore = KeyStore.getInstance(truststoreType);
+                keystore.load(truststoreStream,
+                    truststorePassword != null ? 
truststorePassword.toCharArray() : null);
+              }
             }
 
             TrustManagerFactory tmf = 
TrustManagerFactory.getInstance("SunX509");
diff --git a/flume-ng-sdk/src/test/java/org/apache/flume/api/RpcTestUtils.java 
b/flume-ng-sdk/src/test/java/org/apache/flume/api/RpcTestUtils.java
index d9355f7..6750352 100644
--- a/flume-ng-sdk/src/test/java/org/apache/flume/api/RpcTestUtils.java
+++ b/flume-ng-sdk/src/test/java/org/apache/flume/api/RpcTestUtils.java
@@ -19,8 +19,8 @@
 package org.apache.flume.api;
 
 import junit.framework.Assert;
-import org.apache.avro.AvroRemoteException;
-import org.apache.avro.ipc.NettyServer;
+
+import org.apache.avro.ipc.netty.NettyServer;
 import org.apache.avro.ipc.Responder;
 import org.apache.avro.ipc.Server;
 import org.apache.avro.ipc.specific.SpecificResponder;
@@ -244,7 +244,7 @@ public class RpcTestUtils {
     }
 
     @Override
-    public Status append(AvroFlumeEvent event) throws AvroRemoteException {
+    public Status append(AvroFlumeEvent event) {
       if (failed) {
         logger.debug("Event rejected");
         return Status.FAILED;
@@ -256,8 +256,7 @@ public class RpcTestUtils {
     }
 
     @Override
-    public Status appendBatch(List<AvroFlumeEvent> events) throws
-        AvroRemoteException {
+    public Status appendBatch(List<AvroFlumeEvent> events) {
       if (failed) {
         logger.debug("Event batch rejected");
         return Status.FAILED;
@@ -276,15 +275,14 @@ public class RpcTestUtils {
   public static class OKAvroHandler implements AvroSourceProtocol {
 
     @Override
-    public Status append(AvroFlumeEvent event) throws AvroRemoteException {
+    public Status append(AvroFlumeEvent event) {
       logger.info("OK: Received event from append(): {}",
           new String(event.getBody().array(), Charset.forName("UTF8")));
       return Status.OK;
     }
 
     @Override
-    public Status appendBatch(List<AvroFlumeEvent> events) throws
-        AvroRemoteException {
+    public Status appendBatch(List<AvroFlumeEvent> events) {
       logger.info("OK: Received {} events from appendBatch()",
           events.size());
       return Status.OK;
@@ -298,14 +296,14 @@ public class RpcTestUtils {
   public static class FailedAvroHandler implements AvroSourceProtocol {
 
     @Override
-    public Status append(AvroFlumeEvent event) throws AvroRemoteException {
+    public Status append(AvroFlumeEvent event) {
       logger.info("Failed: Received event from append(): {}",
                   new String(event.getBody().array(), 
Charset.forName("UTF8")));
       return Status.FAILED;
     }
 
     @Override
-    public Status appendBatch(List<AvroFlumeEvent> events) throws 
AvroRemoteException {
+    public Status appendBatch(List<AvroFlumeEvent> events) {
       logger.info("Failed: Received {} events from appendBatch()", 
events.size());
       return Status.FAILED;
     }
@@ -318,14 +316,14 @@ public class RpcTestUtils {
   public static class UnknownAvroHandler implements AvroSourceProtocol {
 
     @Override
-    public Status append(AvroFlumeEvent event) throws AvroRemoteException {
+    public Status append(AvroFlumeEvent event) {
       logger.info("Unknown: Received event from append(): {}",
                   new String(event.getBody().array(), 
Charset.forName("UTF8")));
       return Status.UNKNOWN;
     }
 
     @Override
-    public Status appendBatch(List<AvroFlumeEvent> events) throws 
AvroRemoteException {
+    public Status appendBatch(List<AvroFlumeEvent> events) {
       logger.info("Unknown: Received {} events from appendBatch()",
                   events.size());
       return Status.UNKNOWN;
@@ -336,19 +334,37 @@ public class RpcTestUtils {
   /**
    * A service that logs receipt of the request and then throws an exception
    */
-  public static class ThrowingAvroHandler implements AvroSourceProtocol {
+  public static class ThrowingAvroHandler
+           implements AvroSourceProtocol.Callback {
 
     @Override
-    public Status append(AvroFlumeEvent event) throws AvroRemoteException {
+    public void append(AvroFlumeEvent event,
+                       org.apache.avro.ipc.Callback<Status> callback)
+            throws java.io.IOException {
       logger.info("Throwing: Received event from append(): {}",
                   new String(event.getBody().array(), 
Charset.forName("UTF8")));
-      throw new AvroRemoteException("Handler smash!");
+      throw new java.io.IOException("Handler smash!");
+    }
+
+    @Override
+    public Status append(AvroFlumeEvent event) {
+      logger.info("Throwing unavailable: Received event from append(): {}",
+             new String(event.getBody().array(), Charset.forName("UTF8")));
+      return null;
     }
 
     @Override
-    public Status appendBatch(List<AvroFlumeEvent> events) throws 
AvroRemoteException {
+    public void appendBatch(List<AvroFlumeEvent> events,
+                       org.apache.avro.ipc.Callback<Status> callback)
+            throws java.io.IOException {
       logger.info("Throwing: Received {} events from appendBatch()", 
events.size());
-      throw new AvroRemoteException("Handler smash!");
+      throw new java.io.IOException("Handler smash!");
+    }
+
+    @Override
+    public Status appendBatch(List<AvroFlumeEvent> events) {
+      logger.info("Throwing unavailable: Received {} events from 
appendBatch()", events.size());
+      return null;
     }
   }
 
diff --git a/flume-ng-sinks/flume-dataset-sink/pom.xml 
b/flume-ng-sinks/flume-dataset-sink/pom.xml
deleted file mode 100644
index 56fcbd5..0000000
--- a/flume-ng-sinks/flume-dataset-sink/pom.xml
+++ /dev/null
@@ -1,132 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-Licensed to the Apache Software Foundation (ASF) under one or more
-contributor license agreements.  See the NOTICE file distributed with
-this work for additional information regarding copyright ownership.
-The ASF licenses this file to You under the Apache License, Version 2.0
-(the "License"); you may not use this file except in compliance with
-the License.  You may obtain a copy of the License at
-
-     http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing, software
-distributed under the License is distributed on an "AS IS" BASIS,
-WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-See the License for the specific language governing permissions and
-limitations under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
-         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
-
-  <modelVersion>4.0.0</modelVersion>
-
-  <parent>
-    <artifactId>flume-ng-sinks</artifactId>
-    <groupId>org.apache.flume</groupId>
-    <version>1.10.0-SNAPSHOT</version>
-  </parent>
-
-  <groupId>org.apache.flume.flume-ng-sinks</groupId>
-  <artifactId>flume-dataset-sink</artifactId>
-  <name>Flume NG Kite Dataset Sink</name>
-
-  <properties>
-    <!-- TODO fix spotbugspmd violations -->
-    <spotbugs.maxAllowedViolations>8</spotbugs.maxAllowedViolations>
-    <pmd.maxAllowedViolations>11</pmd.maxAllowedViolations>
-  </properties>
-
-  <dependencies>
-
-    <dependency>
-      <groupId>org.apache.flume</groupId>
-      <artifactId>flume-ng-sdk</artifactId>
-    </dependency>
-
-    <dependency>
-      <groupId>org.apache.flume</groupId>
-      <artifactId>flume-ng-configuration</artifactId>
-    </dependency>
-
-    <dependency>
-      <groupId>org.apache.flume</groupId>
-      <artifactId>flume-ng-core</artifactId>
-    </dependency>
-
-    <dependency>
-      <groupId>org.kitesdk</groupId>
-      <artifactId>kite-data-core</artifactId>
-    </dependency>
-
-    <dependency>
-      <groupId>org.kitesdk</groupId>
-      <artifactId>kite-data-hive</artifactId>
-    </dependency>
-
-    <dependency>
-      <groupId>org.kitesdk</groupId>
-      <artifactId>kite-data-hbase</artifactId>
-    </dependency>
-
-    <dependency>
-      <groupId>org.apache.avro</groupId>
-      <artifactId>avro</artifactId>
-    </dependency>
-
-    <dependency>
-      <groupId>org.apache.hive</groupId>
-      <artifactId>hive-metastore</artifactId>
-      <optional>true</optional>
-    </dependency>
-
-    <dependency>
-      <!-- build will fail if this is not hadoop-common 2.*
-      because kite uses hflush.
-      -->
-      <groupId>org.apache.hadoop</groupId>
-      <artifactId>hadoop-common</artifactId>
-      <optional>true</optional>
-    </dependency>
-
-    <dependency>
-      <groupId>org.slf4j</groupId>
-      <artifactId>slf4j-api</artifactId>
-    </dependency>
-
-    <dependency>
-      <groupId>com.google.guava</groupId>
-      <artifactId>guava</artifactId>
-    </dependency>
-
-    <dependency>
-      <groupId>junit</groupId>
-      <artifactId>junit</artifactId>
-      <scope>test</scope>
-    </dependency>
-
-    <dependency>
-      <groupId>org.apache.hadoop</groupId>
-      <artifactId>hadoop-minicluster</artifactId>
-    </dependency>
-
-    <dependency>
-      <groupId>org.apache.logging.log4j</groupId>
-      <artifactId>log4j-slf4j-impl</artifactId>
-      <scope>test</scope>
-    </dependency>
-
-    <dependency>
-      <groupId>org.apache.logging.log4j</groupId>
-      <artifactId>log4j-1.2-api</artifactId>
-      <scope>test</scope>
-    </dependency>
-
-    <dependency>
-      <groupId>org.mockito</groupId>
-      <artifactId>mockito-all</artifactId>
-      <scope>test</scope>
-    </dependency>
-
-  </dependencies>
-
-</project>
diff --git 
a/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/DatasetSink.java
 
b/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/DatasetSink.java
deleted file mode 100644
index 4a44264..0000000
--- 
a/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/DatasetSink.java
+++ /dev/null
@@ -1,588 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flume.sink.kite;
-
-import org.apache.flume.auth.FlumeAuthenticationUtil;
-import org.apache.flume.auth.PrivilegedExecutor;
-import org.apache.flume.conf.BatchSizeSupported;
-import org.apache.flume.sink.kite.parser.EntityParserFactory;
-import org.apache.flume.sink.kite.parser.EntityParser;
-import org.apache.flume.sink.kite.policy.FailurePolicy;
-import org.apache.flume.sink.kite.policy.FailurePolicyFactory;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Throwables;
-import com.google.common.collect.Lists;
-
-import java.net.URI;
-import java.security.PrivilegedAction;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-import org.apache.avro.Schema;
-import org.apache.avro.file.DataFileWriter;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.flume.Channel;
-import org.apache.flume.Context;
-import org.apache.flume.Event;
-import org.apache.flume.EventDeliveryException;
-import org.apache.flume.Transaction;
-import org.apache.flume.conf.Configurable;
-import org.apache.flume.instrumentation.SinkCounter;
-import org.apache.flume.sink.AbstractSink;
-import org.kitesdk.data.Dataset;
-import org.kitesdk.data.DatasetDescriptor;
-import org.kitesdk.data.DatasetIOException;
-import org.kitesdk.data.DatasetNotFoundException;
-import org.kitesdk.data.DatasetWriter;
-import org.kitesdk.data.Datasets;
-import org.kitesdk.data.Flushable;
-import org.kitesdk.data.Syncable;
-import org.kitesdk.data.View;
-import org.kitesdk.data.spi.Registration;
-import org.kitesdk.data.URIBuilder;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import static org.apache.flume.sink.kite.DatasetSinkConstants.*;
-import org.kitesdk.data.Format;
-import org.kitesdk.data.Formats;
-
-/**
- * Sink that writes events to a Kite Dataset. This sink will parse the body of
- * each incoming event and store the resulting entity in a Kite Dataset. It
- * determines the destination Dataset by opening a dataset URI
- * {@code kite.dataset.uri} or opening a repository URI, {@code kite.repo.uri},
- * and loading a Dataset by name, {@code kite.dataset.name}, and namespace,
- * {@code kite.dataset.namespace}.
- */
-public class DatasetSink extends AbstractSink implements Configurable, 
BatchSizeSupported {
-
-  private static final Logger LOG = LoggerFactory.getLogger(DatasetSink.class);
-
-  private Context context = null;
-  private PrivilegedExecutor privilegedExecutor;
-
-  private String datasetName = null;
-  private URI datasetUri = null;
-  private Schema datasetSchema = null;
-  private DatasetWriter<GenericRecord> writer = null;
-
-  /**
-   * The number of events to process as a single batch.
-   */
-  private long batchSize = DEFAULT_BATCH_SIZE;
-
-  /**
-   * The number of seconds to wait before rolling a writer.
-   */
-  private int rollIntervalSeconds = DEFAULT_ROLL_INTERVAL;
-
-  /**
-   * Flag that says if Flume should commit on every batch.
-   */
-  private boolean commitOnBatch = DEFAULT_FLUSHABLE_COMMIT_ON_BATCH;
-
-  /**
-   * Flag that says if Flume should sync on every batch.
-   */
-  private boolean syncOnBatch = DEFAULT_SYNCABLE_SYNC_ON_BATCH;
-
-  /**
-   * The last time the writer rolled.
-   */
-  private long lastRolledMillis = 0L;
-
-  /**
-   * The raw number of bytes parsed.
-   */
-  private long bytesParsed = 0L;
-
-  /**
-   * A class for parsing Kite entities from Flume Events.
-   */
-  private EntityParser<GenericRecord> parser = null;
-
-  /**
-   * A class implementing a failure newPolicy for events that had a
- non-recoverable error during processing.
-   */
-  private FailurePolicy failurePolicy = null;
-
-  private SinkCounter counter = null;
-
-  /**
-   * The Kite entity
-   */
-  private GenericRecord entity = null;
-  // TODO: remove this after PARQUET-62 is released
-  private boolean reuseEntity = true;
-
-  /**
-   * The Flume transaction. Used to keep transactions open across calls to
-   * process.
-   */
-  private Transaction transaction = null;
-
-  /**
-   * Internal flag on if there has been a batch of records committed. This is
-   * used during rollback to know if the current writer needs to be closed.
-   */
-  private boolean committedBatch = false;
-
-  // Factories
-  private static final EntityParserFactory ENTITY_PARSER_FACTORY =
-      new EntityParserFactory();
-  private static final FailurePolicyFactory FAILURE_POLICY_FACTORY =
-      new FailurePolicyFactory();
-
-  /**
-   * Return the list of allowed formats.
-   * @return The list of allowed formats.
-   */
-  protected List<String> allowedFormats() {
-    return Lists.newArrayList("avro", "parquet");
-  }
-
-  @Override
-  public void configure(Context context) {
-    this.context = context;
-
-    String principal = context.getString(AUTH_PRINCIPAL);
-    String keytab = context.getString(AUTH_KEYTAB);
-    String effectiveUser = context.getString(AUTH_PROXY_USER);
-
-    this.privilegedExecutor = FlumeAuthenticationUtil.getAuthenticator(
-            principal, keytab).proxyAs(effectiveUser);
-
-    // Get the dataset URI and name from the context
-    String datasetURI = context.getString(CONFIG_KITE_DATASET_URI);
-    if (datasetURI != null) {
-      this.datasetUri = URI.create(datasetURI);
-      this.datasetName = uriToName(datasetUri);
-    } else {
-      String repositoryURI = context.getString(CONFIG_KITE_REPO_URI);
-      Preconditions.checkNotNull(repositoryURI, "No dataset configured. 
Setting "
-          + CONFIG_KITE_DATASET_URI + " is required.");
-
-      this.datasetName = context.getString(CONFIG_KITE_DATASET_NAME);
-      Preconditions.checkNotNull(datasetName, "No dataset configured. Setting "
-          + CONFIG_KITE_DATASET_URI + " is required.");
-
-      String namespace = context.getString(CONFIG_KITE_DATASET_NAMESPACE,
-          DEFAULT_NAMESPACE);
-
-      this.datasetUri = new URIBuilder(repositoryURI, namespace, datasetName)
-          .build();
-    }
-    this.setName(datasetUri.toString());
-
-    if (context.getBoolean(CONFIG_SYNCABLE_SYNC_ON_BATCH,
-        DEFAULT_SYNCABLE_SYNC_ON_BATCH)) {
-      Preconditions.checkArgument(
-          context.getBoolean(CONFIG_FLUSHABLE_COMMIT_ON_BATCH,
-              DEFAULT_FLUSHABLE_COMMIT_ON_BATCH), "Configuration error: "
-                  + CONFIG_FLUSHABLE_COMMIT_ON_BATCH + " must be set to true 
when "
-                  + CONFIG_SYNCABLE_SYNC_ON_BATCH + " is set to true.");
-    }
-
-    // Create the configured failure failurePolicy
-    this.failurePolicy = FAILURE_POLICY_FACTORY.newPolicy(context);
-
-    // other configuration
-    this.batchSize = context.getLong(CONFIG_KITE_BATCH_SIZE,
-        DEFAULT_BATCH_SIZE);
-    this.rollIntervalSeconds = context.getInteger(CONFIG_KITE_ROLL_INTERVAL,
-        DEFAULT_ROLL_INTERVAL);
-
-    this.counter = new SinkCounter(datasetName);
-  }
-
-  @Override
-  public synchronized void start() {
-    this.lastRolledMillis = System.currentTimeMillis();
-    counter.start();
-    // signal that this sink is ready to process
-    LOG.info("Started DatasetSink " + getName());
-    super.start();
-  }
-
-  /**
-   * Causes the sink to roll at the next {@link #process()} call.
-   */
-  @VisibleForTesting
-  void roll() {
-    this.lastRolledMillis = 0L;
-  }
-
-  @VisibleForTesting
-  DatasetWriter<GenericRecord> getWriter() {
-    return writer;
-  }
-
-  @VisibleForTesting
-  void setWriter(DatasetWriter<GenericRecord> writer) {
-    this.writer = writer;
-  }
-
-  @VisibleForTesting
-  void setParser(EntityParser<GenericRecord> parser) {
-    this.parser = parser;
-  }
-
-  @VisibleForTesting
-  void setFailurePolicy(FailurePolicy failurePolicy) {
-    this.failurePolicy = failurePolicy;
-  }
-
-  @Override
-  public synchronized void stop() {
-    counter.stop();
-
-    try {
-      // Close the writer and commit the transaction, but don't create a new
-      // writer since we're stopping
-      closeWriter();
-      commitTransaction();
-    } catch (EventDeliveryException ex) {
-      rollbackTransaction();
-
-      LOG.warn("Closing the writer failed: " + ex.getLocalizedMessage());
-      LOG.debug("Exception follows.", ex);
-      // We don't propogate the exception as the transaction would have been
-      // rolled back and we can still finish stopping
-    }
-
-  // signal that this sink has stopped
-    LOG.info("Stopped dataset sink: " + getName());
-    super.stop();
-  }
-
-  @Override
-  public Status process() throws EventDeliveryException {
-    long processedEvents = 0;
-
-    try {
-      if (shouldRoll()) {
-        closeWriter();
-        commitTransaction();
-        createWriter();
-      }
-
-      // The writer shouldn't be null at this point
-      Preconditions.checkNotNull(writer,
-          "Can't process events with a null writer. This is likely a bug.");
-      Channel channel = getChannel();
-
-      // Enter the transaction boundary if we haven't already
-      enterTransaction(channel);
-
-      for (; processedEvents < batchSize; processedEvents += 1) {
-        Event event = channel.take();
-
-        if (event == null) {
-          // no events available in the channel
-          break;
-        }
-
-        write(event);
-      }
-
-      // commit transaction
-      if (commitOnBatch) {
-        // Flush/sync before commiting. A failure here will result in rolling 
back
-        // the transaction
-        if (syncOnBatch && writer instanceof Syncable) {
-          ((Syncable) writer).sync();
-        } else if (writer instanceof Flushable) {
-          ((Flushable) writer).flush();
-        }
-        boolean committed = commitTransaction();
-        Preconditions.checkState(committed,
-            "Tried to commit a batch when there was no transaction");
-        committedBatch |= committed;
-      }
-    } catch (Throwable th) {
-      // catch-all for any unhandled Throwable so that the transaction is
-      // correctly rolled back.
-      rollbackTransaction();
-
-      if (commitOnBatch && committedBatch) {
-        try {
-          closeWriter();
-        } catch (EventDeliveryException ex) {
-          LOG.warn("Error closing writer there may be temp files that need to"
-              + " be manually recovered: " + ex.getLocalizedMessage());
-          LOG.debug("Exception follows.", ex);
-        }
-      } else {
-        this.writer = null;
-      }
-
-      // handle the exception
-      Throwables.propagateIfInstanceOf(th, Error.class);
-      Throwables.propagateIfInstanceOf(th, EventDeliveryException.class);
-      throw new EventDeliveryException(th);
-    }
-
-    if (processedEvents == 0) {
-      counter.incrementBatchEmptyCount();
-      return Status.BACKOFF;
-    } else if (processedEvents < batchSize) {
-      counter.incrementBatchUnderflowCount();
-    } else {
-      counter.incrementBatchCompleteCount();
-    }
-
-    counter.addToEventDrainSuccessCount(processedEvents);
-
-    return Status.READY;
-  }
-
-  /**
-   * Parse the event using the entity parser and write the entity to the 
dataset.
-   *
-   * @param event The event to write
-   * @throws EventDeliveryException An error occurred trying to write to the
-                                dataset that couldn't or shouldn't be
-                                handled by the failure policy.
-   */
-  @VisibleForTesting
-  void write(Event event) throws EventDeliveryException {
-    try {
-      this.entity = parser.parse(event, reuseEntity ? entity : null);
-      this.bytesParsed += event.getBody().length;
-
-      // writeEncoded would be an optimization in some cases, but HBase
-      // will not support it and partitioned Datasets need to get partition
-      // info from the entity Object. We may be able to avoid the
-      // serialization round-trip otherwise.
-      writer.write(entity);
-    } catch (NonRecoverableEventException ex) {
-      failurePolicy.handle(event, ex);
-    } catch (DataFileWriter.AppendWriteException ex) {
-      failurePolicy.handle(event, ex);
-    } catch (RuntimeException ex) {
-      Throwables.propagateIfInstanceOf(ex, EventDeliveryException.class);
-      throw new EventDeliveryException(ex);
-    }
-  }
-
-  /**
-   * Create a new writer.
-   *
-   * This method also re-loads the dataset so updates to the configuration or
-   * a dataset created after Flume starts will be loaded.
-   *
-   * @throws EventDeliveryException There was an error creating the writer.
-   */
-  @VisibleForTesting
-  void createWriter() throws EventDeliveryException {
-    // reset the commited flag whenever a new writer is created
-    committedBatch = false;
-    try {
-      View<GenericRecord> view;
-
-      view = privilegedExecutor.execute(
-        new PrivilegedAction<Dataset<GenericRecord>>() {
-          @Override
-          public Dataset<GenericRecord> run() {
-            return Datasets.load(datasetUri);
-          }
-        });
-
-      DatasetDescriptor descriptor = view.getDataset().getDescriptor();
-      Format format = descriptor.getFormat();
-      Preconditions.checkArgument(allowedFormats().contains(format.getName()),
-          "Unsupported format: " + format.getName());
-
-      Schema newSchema = descriptor.getSchema();
-      if (datasetSchema == null || !newSchema.equals(datasetSchema)) {
-        this.datasetSchema = descriptor.getSchema();
-        // dataset schema has changed, create a new parser
-        parser = ENTITY_PARSER_FACTORY.newParser(datasetSchema, context);
-      }
-
-      this.reuseEntity = !(Formats.PARQUET.equals(format));
-
-      // TODO: Check that the format implements Flushable after CDK-863
-      // goes in. For now, just check that the Dataset is Avro format
-      this.commitOnBatch = context.getBoolean(CONFIG_FLUSHABLE_COMMIT_ON_BATCH,
-          DEFAULT_FLUSHABLE_COMMIT_ON_BATCH) && (Formats.AVRO.equals(format));
-
-      // TODO: Check that the format implements Syncable after CDK-863
-      // goes in. For now, just check that the Dataset is Avro format
-      this.syncOnBatch = context.getBoolean(CONFIG_SYNCABLE_SYNC_ON_BATCH,
-          DEFAULT_SYNCABLE_SYNC_ON_BATCH) && (Formats.AVRO.equals(format));
-
-      this.datasetName = view.getDataset().getName();
-
-      this.writer = view.newWriter();
-
-      // Reset the last rolled time and the metrics
-      this.lastRolledMillis = System.currentTimeMillis();
-      this.bytesParsed = 0L;
-    } catch (DatasetNotFoundException ex) {
-      throw new EventDeliveryException("Dataset " + datasetUri + " not found."
-          + " The dataset must be created before Flume can write to it.", ex);
-    } catch (RuntimeException ex) {
-      throw new EventDeliveryException("Error trying to open a new"
-          + " writer for dataset " + datasetUri, ex);
-    }
-  }
-
-  /**
-   * Return true if the sink should roll the writer.
-   *
-   * Currently, this is based on time since the last roll or if the current
-   * writer is null.
-   *
-   * @return True if and only if the sink should roll the writer
-   */
-  private boolean shouldRoll() {
-    long currentTimeMillis = System.currentTimeMillis();
-    long elapsedTimeSeconds = TimeUnit.MILLISECONDS.toSeconds(
-        currentTimeMillis - lastRolledMillis);
-
-    LOG.debug("Current time: {}, lastRolled: {}, diff: {} sec",
-        new Object[] {currentTimeMillis, lastRolledMillis, 
elapsedTimeSeconds});
-
-    return elapsedTimeSeconds >= rollIntervalSeconds || writer == null;
-  }
-
-  /**
-   * Close the current writer.
-   *
-   * This method always sets the current writer to null even if close fails.
-   * If this method throws an Exception, callers *must* rollback any active
-   * transaction to ensure that data is replayed.
-   *
-   * @throws EventDeliveryException
-   */
-  @VisibleForTesting
-  void closeWriter() throws EventDeliveryException {
-    if (writer != null) {
-      try {
-        writer.close();
-
-        long elapsedTimeSeconds = TimeUnit.MILLISECONDS.toSeconds(
-            System.currentTimeMillis() - lastRolledMillis);
-        LOG.info("Closed writer for {} after {} seconds and {} bytes parsed",
-            new Object[]{datasetUri, elapsedTimeSeconds, bytesParsed});
-      } catch (DatasetIOException ex) {
-        throw new EventDeliveryException("Check HDFS permissions/health. IO"
-            + " error trying to close the  writer for dataset " + datasetUri,
-            ex);
-      } catch (RuntimeException ex) {
-        throw new EventDeliveryException("Error trying to close the  writer 
for"
-            + " dataset " + datasetUri, ex);
-      } finally {
-        // If we failed to close the writer then we give up on it as we'll
-        // end up throwing an EventDeliveryException which will result in
-        // a transaction rollback and a replay of any events written during
-        // the current transaction. If commitOnBatch is true, you can still
-        // end up with orphaned temp files that have data to be recovered.
-        this.writer = null;
-        failurePolicy.close();
-      }
-    }
-  }
-
-  /**
-   * Enter the transaction boundary. This will either begin a new transaction
-   * if one didn't already exist. If we're already in a transaction boundary,
-   * then this method does nothing.
-   *
-   * @param channel The Sink's channel
-   * @throws EventDeliveryException There was an error starting a new batch
-   *                                with the failure policy.
-   */
-  private void enterTransaction(Channel channel) throws EventDeliveryException 
{
-    // There's no synchronization around the transaction instance because the
-    // Sink API states "the Sink#process() call is guaranteed to only
-    // be accessed  by a single thread". Technically other methods could be
-    // called concurrently, but the implementation of SinkRunner waits
-    // for the Thread running process() to end before calling stop()
-    if (transaction == null) {
-      this.transaction = channel.getTransaction();
-      transaction.begin();
-      failurePolicy = FAILURE_POLICY_FACTORY.newPolicy(context);
-    }
-  }
-
-  /**
-   * Commit and close the transaction.
-   *
-   * If this method throws an Exception the caller *must* ensure that the
-   * transaction is rolled back. Callers can roll back the transaction by
-   * calling {@link #rollbackTransaction()}.
-   *
-   * @return True if there was an open transaction and it was committed, false
-   *         otherwise.
-   * @throws EventDeliveryException There was an error ending the batch with
-   *                                the failure policy.
-   */
-  @VisibleForTesting
-  boolean commitTransaction() throws EventDeliveryException {
-    if (transaction != null) {
-      failurePolicy.sync();
-      transaction.commit();
-      transaction.close();
-      this.transaction = null;
-      return true;
-    } else {
-      return false;
-    }
-  }
-
-  /**
-   * Rollback the transaction. If there is a RuntimeException during rollback,
-   * it will be logged but the transaction instance variable will still be
-   * nullified.
-   */
-  private void rollbackTransaction() {
-    if (transaction != null) {
-      try {
-        // If the transaction wasn't committed before we got the exception, we
-        // need to rollback.
-        transaction.rollback();
-      } catch (RuntimeException ex) {
-        LOG.error("Transaction rollback failed: " + ex.getLocalizedMessage());
-        LOG.debug("Exception follows.", ex);
-      } finally {
-        transaction.close();
-        this.transaction = null;
-      }
-    }
-  }
-
-  /**
-   * Get the name of the dataset from the URI
-   *
-   * @param uri The dataset or view URI
-   * @return The dataset name
-   */
-  private static String uriToName(URI uri) {
-    return Registration.lookupDatasetUri(URI.create(
-        uri.getRawSchemeSpecificPart())).second().get("dataset");
-  }
-
-  @Override
-  public long getBatchSize() {
-    return batchSize;
-  }
-}
diff --git 
a/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/DatasetSinkConstants.java
 
b/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/DatasetSinkConstants.java
deleted file mode 100644
index af33304..0000000
--- 
a/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/DatasetSinkConstants.java
+++ /dev/null
@@ -1,132 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flume.sink.kite;
-
-import org.kitesdk.data.URIBuilder;
-
-public class DatasetSinkConstants {
-  /**
-   * URI of the Kite Dataset
-   */
-  public static final String CONFIG_KITE_DATASET_URI = "kite.dataset.uri";
-
-  /**
-   * URI of the Kite DatasetRepository.
-   */
-  public static final String CONFIG_KITE_REPO_URI = "kite.repo.uri";
-
-  /**
-   * Name of the Kite Dataset to write into.
-   */
-  public static final String CONFIG_KITE_DATASET_NAME = "kite.dataset.name";
-
-  /**
-   * Namespace of the Kite Dataset to write into.
-   */
-  public static final String CONFIG_KITE_DATASET_NAMESPACE =
-      "kite.dataset.namespace";
-  public static final String DEFAULT_NAMESPACE = URIBuilder.NAMESPACE_DEFAULT;
-
-  /**
-   * Number of records to process from the incoming channel per call to 
process.
-   */
-  public static final String CONFIG_KITE_BATCH_SIZE = "kite.batchSize";
-  public static long DEFAULT_BATCH_SIZE = 100;
-
-  /**
-   * Maximum time to wait before finishing files.
-   */
-  public static final String CONFIG_KITE_ROLL_INTERVAL = "kite.rollInterval";
-  public static int DEFAULT_ROLL_INTERVAL = 30; // seconds
-
-  /**
-   * Flag for committing the Flume transaction on each batch for Flushable
-   * datasets. When set to false, Flume will only commit the transaction when
-   * roll interval has expired. Setting this to false requires enough space
-   * in the channel to handle all events delivered during the roll interval.
-   * Defaults to true.
-   */
-  public static final String CONFIG_FLUSHABLE_COMMIT_ON_BATCH =
-      "kite.flushable.commiteOnBatch";
-  public static boolean DEFAULT_FLUSHABLE_COMMIT_ON_BATCH = true;
-
-  /**
-   * Flag for syncing the DatasetWriter on each batch for Syncable
-   * datasets. Defaults to true.
-   */
-  public static final String CONFIG_SYNCABLE_SYNC_ON_BATCH =
-      "kite.syncable.syncOnBatch";
-  public static boolean DEFAULT_SYNCABLE_SYNC_ON_BATCH = true;
-
-  /**
-   * Parser used to parse Flume Events into Kite entities.
-   */
-  public static final String CONFIG_ENTITY_PARSER = "kite.entityParser";
-
-  /**
-   * Built-in entity parsers
-   */
-  public static final String AVRO_ENTITY_PARSER = "avro";
-  public static final String DEFAULT_ENTITY_PARSER = AVRO_ENTITY_PARSER;
-  public static final String[] AVAILABLE_PARSERS = new String[] {
-    AVRO_ENTITY_PARSER
-  };
-
-  /**
-   * Policy used to handle non-recoverable failures.
-   */
-  public static final String CONFIG_FAILURE_POLICY = "kite.failurePolicy";
-
-  /**
-   * Write non-recoverable Flume events to a Kite dataset.
-   */
-  public static final String SAVE_FAILURE_POLICY = "save";
-
-  /**
-   * The URI to write non-recoverable Flume events to in the case of an error.
-   * If the dataset doesn't exist, it will be created.
-   */
-  public static final String CONFIG_KITE_ERROR_DATASET_URI =
-      "kite.error.dataset.uri";
-
-  /**
-   * Retry non-recoverable Flume events. This will lead to a never ending cycle
-   * of failure, but matches the previous default semantics of the DatasetSink.
-   */
-  public static final String RETRY_FAILURE_POLICY = "retry";
-  public static final String DEFAULT_FAILURE_POLICY = RETRY_FAILURE_POLICY;
-  public static final String[] AVAILABLE_POLICIES = new String[] {
-    RETRY_FAILURE_POLICY,
-    SAVE_FAILURE_POLICY
-  };
-
-  /**
-   * Headers where avro schema information is expected.
-   */
-  public static final String AVRO_SCHEMA_LITERAL_HEADER =
-      "flume.avro.schema.literal";
-  public static final String AVRO_SCHEMA_URL_HEADER = "flume.avro.schema.url";
-
-  /**
-   * Hadoop authentication settings
-   */
-  public static final String AUTH_PROXY_USER = "auth.proxyUser";
-  public static final String AUTH_PRINCIPAL = "auth.kerberosPrincipal";
-  public static final String AUTH_KEYTAB = "auth.kerberosKeytab";
-}
diff --git 
a/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/NonRecoverableEventException.java
 
b/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/NonRecoverableEventException.java
deleted file mode 100644
index 991869a..0000000
--- 
a/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/NonRecoverableEventException.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flume.sink.kite;
-
-/**
- * A non-recoverable error trying to deliver the event.
- *
- * Non-recoverable event delivery failures include:
- *
- * 1. Error parsing the event body thrown from the {@link EntityParser}
- * 2. A schema mismatch between the schema of an event and the schema of the
- *    destination dataset.
- * 3. A missing schema from the Event header when using the
- *    {@link AvroEntityParser}.
- */
-public class NonRecoverableEventException extends Exception {
-
-  private static final long serialVersionUID = 3485151222482254285L;
-
-  public NonRecoverableEventException() {
-    super();
-  }
-
-  public NonRecoverableEventException(String message) {
-    super(message);
-  }
-
-  public NonRecoverableEventException(String message, Throwable t) {
-    super(message, t);
-  }
-
-  public NonRecoverableEventException(Throwable t) {
-    super(t);
-  }
-
-}
diff --git 
a/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/parser/AvroParser.java
 
b/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/parser/AvroParser.java
deleted file mode 100644
index 7c6a723..0000000
--- 
a/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/parser/AvroParser.java
+++ /dev/null
@@ -1,208 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flume.sink.kite.parser;
-
-import com.google.common.base.Preconditions;
-import com.google.common.cache.CacheBuilder;
-import com.google.common.cache.CacheLoader;
-import com.google.common.cache.LoadingCache;
-import com.google.common.util.concurrent.UncheckedExecutionException;
-import java.io.IOException;
-import java.io.InputStream;
-import java.net.URI;
-import java.net.URL;
-import java.util.Locale;
-import java.util.Map;
-import java.util.concurrent.ExecutionException;
-import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericDatumReader;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.avro.io.BinaryDecoder;
-import org.apache.avro.io.DatumReader;
-import org.apache.avro.io.DecoderFactory;
-import org.apache.flume.Context;
-import org.apache.flume.Event;
-import org.apache.flume.EventDeliveryException;
-import org.apache.flume.sink.kite.NonRecoverableEventException;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-
-import static org.apache.flume.sink.kite.DatasetSinkConstants.*;
-
-/**
- * An {@link EntityParser} that parses Avro serialized bytes from an event.
- * 
- * The Avro schema used to serialize the data should be set as either a URL
- * or literal in the flume.avro.schema.url or flume.avro.schema.literal event
- * headers respectively.
- */
-public class AvroParser implements EntityParser<GenericRecord> {
-
-  static Configuration conf = new Configuration();
-
-  /**
-   * A cache of literal schemas to avoid re-parsing the schema.
-   */
-  private static final LoadingCache<String, Schema> schemasFromLiteral =
-      CacheBuilder.newBuilder()
-      .build(new CacheLoader<String, Schema>() {
-        @Override
-        public Schema load(String literal) {
-          Preconditions.checkNotNull(literal,
-              "Schema literal cannot be null without a Schema URL");
-          return new Schema.Parser().parse(literal);
-        }
-      });
-
-  /**
-   * A cache of schemas retrieved by URL to avoid re-parsing the schema.
-   */
-  private static final LoadingCache<String, Schema> schemasFromURL =
-      CacheBuilder.newBuilder()
-      .build(new CacheLoader<String, Schema>() {
-        @Override
-        public Schema load(String url) throws IOException {
-          Schema.Parser parser = new Schema.Parser();
-          InputStream is = null;
-          try {
-            FileSystem fs = FileSystem.get(URI.create(url), conf);
-            if (url.toLowerCase(Locale.ENGLISH).startsWith("hdfs:/")) {
-              is = fs.open(new Path(url));
-            } else {
-              is = new URL(url).openStream();
-            }
-            return parser.parse(is);
-          } finally {
-            if (is != null) {
-              is.close();
-            }
-          }
-        }
-      });
-
-  /**
-   * The schema of the destination dataset.
-   * 
-   * Used as the reader schema during parsing.
-   */
-  private final Schema datasetSchema;
-
-  /**
-   * A cache of DatumReaders per schema.
-   */
-  private final LoadingCache<Schema, DatumReader<GenericRecord>> readers =
-      CacheBuilder.newBuilder()
-          .build(new CacheLoader<Schema, DatumReader<GenericRecord>>() {
-            @Override
-            public DatumReader<GenericRecord> load(Schema schema) {
-              // must use the target dataset's schema for reading to ensure the
-              // records are able to be stored using it
-              return new GenericDatumReader<GenericRecord>(
-                  schema, datasetSchema);
-            }
-          });
-
-  /**
-   * The binary decoder to reuse for event parsing.
-   */
-  private BinaryDecoder decoder = null;
-
-  /**
-   * Create a new AvroParser given the schema of the destination dataset.
-   * 
-   * @param datasetSchema The schema of the destination dataset.
-   */
-  private AvroParser(Schema datasetSchema) {
-    this.datasetSchema = datasetSchema;
-  }
-
-  /**
-   * Parse the entity from the body of the given event.
-   * 
-   * @param event The event to parse.
-   * @param reuse If non-null, this may be reused and returned from this 
method.
-   * @return The parsed entity as a GenericRecord.
-   * @throws EventDeliveryException A recoverable error such as an error
-   *                                downloading the schema from the URL has
-   *                                occurred.
-   * @throws NonRecoverableEventException A non-recoverable error such as an
-   *                                      unparsable schema or entity has
-   *                                      occurred.
-   */
-  @Override
-  public GenericRecord parse(Event event, GenericRecord reuse)
-      throws EventDeliveryException, NonRecoverableEventException {
-    decoder = DecoderFactory.get().binaryDecoder(event.getBody(), decoder);
-
-    try {
-      DatumReader<GenericRecord> reader = readers.getUnchecked(schema(event));
-      return reader.read(reuse, decoder);
-    } catch (IOException ex) {
-      throw new NonRecoverableEventException("Cannot deserialize event", ex);
-    } catch (RuntimeException ex) {
-      throw new NonRecoverableEventException("Cannot deserialize event", ex);
-    }
-  }
-
-  /**
-   * Get the schema from the event headers.
-   * 
-   * @param event The Flume event
-   * @return The schema for the event
-   * @throws EventDeliveryException A recoverable error such as an error
-   *                                downloading the schema from the URL has
-   *                                occurred.
-   * @throws NonRecoverableEventException A non-recoverable error such as an
-   *                                      unparsable schema has occurred.
-   */
-  private static Schema schema(Event event) throws EventDeliveryException,
-      NonRecoverableEventException {
-    Map<String, String> headers = event.getHeaders();
-    String schemaURL = headers.get(AVRO_SCHEMA_URL_HEADER);
-    try {
-      if (schemaURL != null) {
-        return schemasFromURL.get(schemaURL);
-      } else {
-        String schemaLiteral = headers.get(AVRO_SCHEMA_LITERAL_HEADER);
-        if (schemaLiteral == null) {
-          throw new NonRecoverableEventException("No schema in event headers."
-              + " Headers must include either " + AVRO_SCHEMA_URL_HEADER
-              + " or " + AVRO_SCHEMA_LITERAL_HEADER);
-        }
-
-        return schemasFromLiteral.get(schemaLiteral);
-      }
-    } catch (ExecutionException ex) {
-      throw new EventDeliveryException("Cannot get schema", ex.getCause());
-    } catch (UncheckedExecutionException ex) {
-      throw new NonRecoverableEventException("Cannot parse schema",
-          ex.getCause());
-    }
-  }
-
-  public static class Builder implements EntityParser.Builder<GenericRecord> {
-
-    @Override
-    public EntityParser<GenericRecord> build(Schema datasetSchema, Context 
config) {
-      return new AvroParser(datasetSchema);
-    }
-    
-  }
-}
diff --git 
a/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/parser/EntityParser.java
 
b/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/parser/EntityParser.java
deleted file mode 100644
index f2051a2..0000000
--- 
a/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/parser/EntityParser.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flume.sink.kite.parser;
-
-import javax.annotation.concurrent.NotThreadSafe;
-import org.apache.avro.Schema;
-import org.apache.flume.Context;
-import org.apache.flume.Event;
-import org.apache.flume.EventDeliveryException;
-import org.apache.flume.sink.kite.NonRecoverableEventException;
-
-@NotThreadSafe
-public interface EntityParser<E> {
-
-  /**
-   * Parse a Kite entity from a Flume event
-   *
-   * @param event The event to parse
-   * @param reuse If non-null, this may be reused and returned
-   * @return The parsed entity
-   * @throws EventDeliveryException A recoverable error during parsing. Parsing
-   *                                can be safely retried.
-   * @throws NonRecoverableEventException A non-recoverable error during
-   *                                      parsing. The event must be discarded.
-   *                                    
-   */
-  public E parse(Event event, E reuse) throws EventDeliveryException,
-      NonRecoverableEventException;
-
-  /**
-   * Knows how to build {@code EntityParser}s. Implementers must provide a
-   * no-arg constructor.
-   * 
-   * @param <E> The type of entities generated
-   */
-  public static interface Builder<E> {
-
-    public EntityParser<E> build(Schema datasetSchema, Context config);
-  }
-}
diff --git 
a/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/parser/EntityParserFactory.java
 
b/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/parser/EntityParserFactory.java
deleted file mode 100644
index 3720ff3..0000000
--- 
a/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/parser/EntityParserFactory.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flume.sink.kite.parser;
-
-import java.util.Arrays;
-import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.flume.Context;
-
-import static org.apache.flume.sink.kite.DatasetSinkConstants.*;
-
-public class EntityParserFactory {
-
-  public EntityParser<GenericRecord> newParser(Schema datasetSchema, Context 
config) {
-    EntityParser<GenericRecord> parser;
-
-    String parserType = config.getString(CONFIG_ENTITY_PARSER,
-        DEFAULT_ENTITY_PARSER);
-
-    if (parserType.equals(AVRO_ENTITY_PARSER)) {
-      parser = new AvroParser.Builder().build(datasetSchema, config);
-    } else {
-
-      Class<? extends EntityParser.Builder> builderClass;
-      Class c;
-      try {
-        c = Class.forName(parserType);
-      } catch (ClassNotFoundException ex) {
-        throw new IllegalArgumentException("EntityParser.Builder class "
-            + parserType + " not found. Must set " + CONFIG_ENTITY_PARSER
-            + " to a class that implements EntityParser.Builder or to a 
builtin"
-            + " parser: " + Arrays.toString(AVAILABLE_PARSERS), ex);
-      }
-
-      if (c != null && EntityParser.Builder.class.isAssignableFrom(c)) {
-        builderClass = c;
-      } else {
-        throw new IllegalArgumentException("Class " + parserType + " does not"
-            + " implement EntityParser.Builder. Must set "
-            + CONFIG_ENTITY_PARSER + " to a class that extends"
-            + " EntityParser.Builder or to a builtin parser: "
-            + Arrays.toString(AVAILABLE_PARSERS));
-      }
-
-      EntityParser.Builder<GenericRecord> builder;
-      try {
-        builder = builderClass.newInstance();
-      } catch (InstantiationException ex) {
-        throw new IllegalArgumentException("Can't instantiate class "
-            + parserType + ". Must set " + CONFIG_ENTITY_PARSER + " to a class"
-            + " that extends EntityParser.Builder or to a builtin parser: "
-            + Arrays.toString(AVAILABLE_PARSERS), ex);
-      } catch (IllegalAccessException ex) {
-        throw new IllegalArgumentException("Can't instantiate class "
-            + parserType + ". Must set " + CONFIG_ENTITY_PARSER + " to a class"
-            + " that extends EntityParser.Builder or to a builtin parser: "
-            + Arrays.toString(AVAILABLE_PARSERS), ex);
-      }
-
-      parser = builder.build(datasetSchema, config);
-    }
-
-    return parser;
-  }
-}
diff --git 
a/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/policy/FailurePolicy.java
 
b/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/policy/FailurePolicy.java
deleted file mode 100644
index f6f875a..0000000
--- 
a/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/policy/FailurePolicy.java
+++ /dev/null
@@ -1,105 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flume.sink.kite.policy;
-
-import org.apache.flume.Context;
-import org.apache.flume.Event;
-import org.apache.flume.EventDeliveryException;
-import org.apache.flume.sink.kite.DatasetSink;
-import org.kitesdk.data.Syncable;
-
-/**
- * A policy for dealing with non-recoverable event delivery failures.
- *
- * Non-recoverable event delivery failures include:
- *
- * 1. Error parsing the event body thrown from the {@link EntityParser}
- * 2. A schema mismatch between the schema of an event and the schema of the
- *    destination dataset.
- * 3. A missing schema from the Event header when using the
- *    {@link AvroEntityParser}.
- *
- * The life cycle of a FailurePolicy mimics the life cycle of the
- * {@link DatasetSink#writer}:
- *
- * 1. When a new writer is created, the policy will be instantiated.
- * 2. As Event failures happen,
- *    {@link #handle(org.apache.flume.Event, java.lang.Throwable)} will be
- *    called to let the policy handle the failure.
- * 3. If the {@link DatasetSink} is configured to commit on batch, then the
- *    {@link #sync()} method will be called when the batch is committed.
- * 4. When the writer is closed, the policy's {@link #close()} method will be
- *    called.
- */
-public interface FailurePolicy {
-
-  /**
-   * Handle a non-recoverable event.
-   *
-   * @param event The event
-   * @param cause The cause of the failure
-   * @throws EventDeliveryException The policy failed to handle the event. When
-   *                                this is thrown, the Flume transaction will
-   *                                be rolled back and the event will be 
retried
-   *                                along with the rest of the batch.
-   */
-  public void handle(Event event, Throwable cause)
-      throws EventDeliveryException;
-
-  /**
-   * Ensure any handled events are on stable storage.
-   *
-   * This allows the policy implementation to sync any data that it may not
-   * have fully handled.
-   *
-   * See {@link Syncable#sync()}.
-   *
-   * @throws EventDeliveryException The policy failed while syncing data.
-   *                                When this is thrown, the Flume transaction
-   *                                will be rolled back and the batch will be
-   *                                retried.
-   */
-  public void sync() throws EventDeliveryException;
-
-  /**
-   * Close this FailurePolicy and release any resources.
-   *
-   * @throws EventDeliveryException The policy failed while closing resources.
-   *                                When this is thrown, the Flume transaction
-   *                                will be rolled back and the batch will be
-   *                                retried.
-   */
-  public void close() throws EventDeliveryException;
-
-  /**
-   * Knows how to build {@code FailurePolicy}s. Implementers must provide a
-   * no-arg constructor.
-   */
-  public static interface Builder {
-
-    /**
-     * Build a new {@code FailurePolicy}
-     *
-     * @param config The Flume configuration context
-     * @return The {@code FailurePolicy}
-     */
-    FailurePolicy build(Context config);
-  }
-
-}
diff --git 
a/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/policy/FailurePolicyFactory.java
 
b/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/policy/FailurePolicyFactory.java
deleted file mode 100644
index d3b1fe8..0000000
--- 
a/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/policy/FailurePolicyFactory.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flume.sink.kite.policy;
-
-import java.util.Arrays;
-import org.apache.flume.Context;
-
-import static org.apache.flume.sink.kite.DatasetSinkConstants.*;
-
-public class FailurePolicyFactory {
-
-  public FailurePolicy newPolicy(Context config) {
-    FailurePolicy policy;
-
-    String policyType = config.getString(CONFIG_FAILURE_POLICY,
-        DEFAULT_FAILURE_POLICY);
-
-    if (policyType.equals(RETRY_FAILURE_POLICY)) {
-      policy = new RetryPolicy.Builder().build(config);
-    } else if (policyType.equals(SAVE_FAILURE_POLICY)) {
-      policy = new SavePolicy.Builder().build(config);
-    } else {
-
-      Class<? extends FailurePolicy.Builder> builderClass;
-      Class c;
-      try {
-        c = Class.forName(policyType);
-      } catch (ClassNotFoundException ex) {
-        throw new IllegalArgumentException("FailurePolicy.Builder class "
-            + policyType + " not found. Must set " + CONFIG_FAILURE_POLICY
-            + " to a class that implements FailurePolicy.Builder or to a 
builtin"
-            + " policy: " + Arrays.toString(AVAILABLE_POLICIES), ex);
-      }
-
-      if (c != null && FailurePolicy.Builder.class.isAssignableFrom(c)) {
-        builderClass = c;
-      } else {
-        throw new IllegalArgumentException("Class " + policyType + " does not"
-            + " implement FailurePolicy.Builder. Must set "
-            + CONFIG_FAILURE_POLICY + " to a class that extends"
-            + " FailurePolicy.Builder or to a builtin policy: "
-            + Arrays.toString(AVAILABLE_POLICIES));
-      }
-
-      FailurePolicy.Builder builder;
-      try {
-        builder = builderClass.newInstance();
-      } catch (InstantiationException ex) {
-        throw new IllegalArgumentException("Can't instantiate class "
-            + policyType + ". Must set " + CONFIG_FAILURE_POLICY + " to a 
class"
-            + " that extends FailurePolicy.Builder or to a builtin policy: "
-            + Arrays.toString(AVAILABLE_POLICIES), ex);
-      } catch (IllegalAccessException ex) {
-        throw new IllegalArgumentException("Can't instantiate class "
-            + policyType + ". Must set " + CONFIG_FAILURE_POLICY + " to a 
class"
-            + " that extends FailurePolicy.Builder or to a builtin policy: "
-            + Arrays.toString(AVAILABLE_POLICIES), ex);
-      }
-
-      policy = builder.build(config);
-    }
-   
-    return policy;
-  }
-}
diff --git 
a/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/policy/RetryPolicy.java
 
b/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/policy/RetryPolicy.java
deleted file mode 100644
index 9a4991c..0000000
--- 
a/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/policy/RetryPolicy.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flume.sink.kite.policy;
-
-import org.apache.flume.Context;
-import org.apache.flume.Event;
-import org.apache.flume.EventDeliveryException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * A failure policy that logs the error and then forces a retry by throwing
- * {@link EventDeliveryException}.
- */
-public class RetryPolicy implements FailurePolicy {
-  private static final Logger LOG = LoggerFactory.getLogger(RetryPolicy.class);
-
-  private RetryPolicy() {
-  }
-
-  @Override
-  public void handle(Event event, Throwable cause) throws 
EventDeliveryException {
-    LOG.error("Event delivery failed: " + cause.getLocalizedMessage());
-    LOG.debug("Exception follows.", cause);
-
-    throw new EventDeliveryException(cause);
-  }
-
-  @Override
-  public void sync() throws EventDeliveryException {
-    // do nothing
-  }
-
-  @Override
-  public void close() throws EventDeliveryException {
-    // do nothing
-  }
-
-  public static class Builder implements FailurePolicy.Builder {
-
-    @Override
-    public FailurePolicy build(Context config) {
-      return new RetryPolicy();
-    }
-
-  }
-}
diff --git 
a/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/policy/SavePolicy.java
 
b/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/policy/SavePolicy.java
deleted file mode 100644
index bd537ec..0000000
--- 
a/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/policy/SavePolicy.java
+++ /dev/null
@@ -1,128 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flume.sink.kite.policy;
-
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Maps;
-import java.nio.ByteBuffer;
-import java.util.Map;
-import org.apache.flume.Context;
-import org.apache.flume.Event;
-import org.apache.flume.EventDeliveryException;
-import org.apache.flume.source.avro.AvroFlumeEvent;
-import org.kitesdk.data.DatasetDescriptor;
-import org.kitesdk.data.DatasetWriter;
-import org.kitesdk.data.Datasets;
-import org.kitesdk.data.Formats;
-import org.kitesdk.data.Syncable;
-import org.kitesdk.data.View;
-
-import static org.apache.flume.sink.kite.DatasetSinkConstants.*;
-
-/**
- * A failure policy that writes the raw Flume event to a Kite dataset.
- */
-public class SavePolicy implements FailurePolicy {
-
-  private final View<AvroFlumeEvent> dataset;
-  private DatasetWriter<AvroFlumeEvent> writer;
-  private int nEventsHandled;
-
-  private SavePolicy(Context context) {
-    String uri = context.getString(CONFIG_KITE_ERROR_DATASET_URI);
-    Preconditions.checkArgument(uri != null, "Must set "
-        + CONFIG_KITE_ERROR_DATASET_URI + " when " + CONFIG_FAILURE_POLICY
-        + "=save");
-    if (Datasets.exists(uri)) {
-      dataset = Datasets.load(uri, AvroFlumeEvent.class);
-    } else {
-      DatasetDescriptor descriptor = new DatasetDescriptor.Builder()
-          .schema(AvroFlumeEvent.class)
-          .build();
-      dataset = Datasets.create(uri, descriptor, AvroFlumeEvent.class);
-    }
-
-    nEventsHandled = 0;
-  }
-
-  @Override
-  public void handle(Event event, Throwable cause) throws 
EventDeliveryException {
-    try {
-      if (writer == null) {
-        writer = dataset.newWriter();
-      }
-
-      final AvroFlumeEvent avroEvent = new AvroFlumeEvent();
-      avroEvent.setBody(ByteBuffer.wrap(event.getBody()));
-      avroEvent.setHeaders(toCharSeqMap(event.getHeaders()));
-
-      writer.write(avroEvent);
-      nEventsHandled++;
-    } catch (RuntimeException ex) {
-      throw new EventDeliveryException(ex);
-    }
-  }
-
-  @Override
-  public void sync() throws EventDeliveryException {
-    if (nEventsHandled > 0) {
-      if (Formats.PARQUET.equals(
-          dataset.getDataset().getDescriptor().getFormat())) {
-        // We need to close the writer on sync if we're writing to a Parquet
-        // dataset
-        close();
-      } else {
-        if (writer instanceof Syncable) {
-          ((Syncable) writer).sync();
-        }
-      }
-    }
-  }
-
-  @Override
-  public void close() throws EventDeliveryException {
-    if (nEventsHandled > 0) {
-      try {
-        writer.close();
-      } catch (RuntimeException ex) {
-        throw new EventDeliveryException(ex);
-      } finally {
-        writer = null;
-        nEventsHandled = 0;
-      }
-    }
-  }
-
-  /**
-   * Helper function to convert a map of String to a map of CharSequence.
-   */
-  private static Map<CharSequence, CharSequence> toCharSeqMap(
-      Map<String, String> map) {
-    return Maps.<CharSequence, CharSequence>newHashMap(map);
-  }
-
-  public static class Builder implements FailurePolicy.Builder {
-
-    @Override
-    public FailurePolicy build(Context config) {
-      return new SavePolicy(config);
-    }
-
-  }
-}
diff --git 
a/flume-ng-sinks/flume-dataset-sink/src/test/java/org/apache/flume/sink/kite/TestDatasetSink.java
 
b/flume-ng-sinks/flume-dataset-sink/src/test/java/org/apache/flume/sink/kite/TestDatasetSink.java
deleted file mode 100644
index 3709577..0000000
--- 
a/flume-ng-sinks/flume-dataset-sink/src/test/java/org/apache/flume/sink/kite/TestDatasetSink.java
+++ /dev/null
@@ -1,1036 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flume.sink.kite;
-
-import com.google.common.base.Function;
-import com.google.common.base.Throwables;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-import org.apache.avro.Schema;
-import org.apache.avro.file.DataFileWriter;
-import org.apache.avro.generic.GenericData;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.avro.generic.GenericRecordBuilder;
-import org.apache.avro.io.Encoder;
-import org.apache.avro.io.EncoderFactory;
-import org.apache.avro.reflect.ReflectDatumWriter;
-import org.apache.avro.util.Utf8;
-import org.apache.commons.io.FileUtils;
-import org.apache.flume.Channel;
-import org.apache.flume.Context;
-import org.apache.flume.Event;
-import org.apache.flume.EventDeliveryException;
-import org.apache.flume.Transaction;
-import org.apache.flume.channel.MemoryChannel;
-import org.apache.flume.conf.Configurables;
-import org.apache.flume.event.SimpleEvent;
-import org.apache.flume.sink.kite.parser.EntityParser;
-import org.apache.flume.sink.kite.policy.FailurePolicy;
-import org.apache.flume.source.avro.AvroFlumeEvent;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.junit.After;
-import org.junit.AfterClass;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import org.kitesdk.data.Dataset;
-import org.kitesdk.data.DatasetDescriptor;
-import org.kitesdk.data.DatasetReader;
-import org.kitesdk.data.DatasetWriter;
-import org.kitesdk.data.Datasets;
-import org.kitesdk.data.PartitionStrategy;
-import org.kitesdk.data.View;
-
-import javax.annotation.Nullable;
-import java.io.ByteArrayOutputStream;
-import java.io.File;
-import java.io.FileWriter;
-import java.io.IOException;
-import java.net.URI;
-import java.nio.ByteBuffer;
-import java.util.Arrays;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.Callable;
-
-import static org.mockito.Mockito.any;
-import static org.mockito.Mockito.doThrow;
-import static org.mockito.Mockito.eq;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.never;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-public class TestDatasetSink {
-
-  public static final String FILE_REPO_URI = "repo:file:target/test_repo";
-  public static final String DATASET_NAME = "test";
-  public static final String FILE_DATASET_URI =
-      "dataset:file:target/test_repo/" + DATASET_NAME;
-  public static final String ERROR_DATASET_URI =
-      "dataset:file:target/test_repo/failed_events";
-  public static final File SCHEMA_FILE = new File("target/record-schema.avsc");
-  public static final Schema RECORD_SCHEMA = new Schema.Parser().parse(
-      "{\"type\":\"record\",\"name\":\"rec\",\"fields\":[" +
-          "{\"name\":\"id\",\"type\":\"string\"}," +
-          "{\"name\":\"msg\",\"type\":[\"string\",\"null\"]," +
-              "\"default\":\"default\"}]}");
-  public static final Schema COMPATIBLE_SCHEMA = new Schema.Parser().parse(
-      "{\"type\":\"record\",\"name\":\"rec\",\"fields\":[" +
-          "{\"name\":\"id\",\"type\":\"string\"}]}");
-  public static final Schema INCOMPATIBLE_SCHEMA = new Schema.Parser().parse(
-      "{\"type\":\"record\",\"name\":\"user\",\"fields\":[" +
-          "{\"name\":\"username\",\"type\":\"string\"}]}");
-  public static final Schema UPDATED_SCHEMA = new Schema.Parser().parse(
-      "{\"type\":\"record\",\"name\":\"rec\",\"fields\":[" +
-          "{\"name\":\"id\",\"type\":\"string\"}," +
-          "{\"name\":\"priority\",\"type\":\"int\", \"default\": 0}," +
-          "{\"name\":\"msg\",\"type\":[\"string\",\"null\"]," +
-          "\"default\":\"default\"}]}");
-  public static final DatasetDescriptor DESCRIPTOR = new DatasetDescriptor
-      .Builder()
-      .schema(RECORD_SCHEMA)
-      .build();
-
-  Context config = null;
-  Channel in = null;
-  List<GenericRecord> expected = null;
-  private static final String DFS_DIR = "target/test/dfs";
-  private static final String TEST_BUILD_DATA_KEY = "test.build.data";
-  private static String oldTestBuildDataProp = null;
-
-  @BeforeClass
-  public static void saveSchema() throws IOException {
-    oldTestBuildDataProp = System.getProperty(TEST_BUILD_DATA_KEY);
-    System.setProperty(TEST_BUILD_DATA_KEY, DFS_DIR);
-    FileWriter schema = new FileWriter(SCHEMA_FILE);
-    schema.append(RECORD_SCHEMA.toString());
-    schema.close();
-  }
-
-  @AfterClass
-  public static void tearDownClass() {
-    FileUtils.deleteQuietly(new File(DFS_DIR));
-    if (oldTestBuildDataProp != null) {
-      System.setProperty(TEST_BUILD_DATA_KEY, oldTestBuildDataProp);
-    }
-  }
-
-  @Before
-  public void setup() throws EventDeliveryException {
-    Datasets.delete(FILE_DATASET_URI);
-    Datasets.create(FILE_DATASET_URI, DESCRIPTOR);
-
-    this.config = new Context();
-    config.put("keep-alive", "0");
-    this.in = new MemoryChannel();
-    Configurables.configure(in, config);
-
-    config.put(DatasetSinkConstants.CONFIG_KITE_DATASET_URI, FILE_DATASET_URI);
-
-    GenericRecordBuilder builder = new GenericRecordBuilder(RECORD_SCHEMA);
-    expected = Lists.<GenericRecord>newArrayList(
-        builder.set("id", "1").set("msg", "msg1").build(),
-        builder.set("id", "2").set("msg", "msg2").build(),
-        builder.set("id", "3").set("msg", "msg3").build());
-
-    putToChannel(in, Iterables.transform(expected,
-        new Function<GenericRecord, Event>() {
-          private int i = 0;
-
-          @Override
-          public Event apply(@Nullable GenericRecord rec) {
-            this.i += 1;
-            boolean useURI = (i % 2) == 0;
-            return event(rec, RECORD_SCHEMA, SCHEMA_FILE, useURI);
-          }
-        }));
-  }
-
-  @After
-  public void teardown() {
-    Datasets.delete(FILE_DATASET_URI);
-  }
-
-  @Test
-  public void testOldConfig() throws EventDeliveryException {
-    config.put(DatasetSinkConstants.CONFIG_KITE_DATASET_URI, null);
-    config.put(DatasetSinkConstants.CONFIG_KITE_REPO_URI, FILE_REPO_URI);
-    config.put(DatasetSinkConstants.CONFIG_KITE_DATASET_NAME, DATASET_NAME);
-
-    DatasetSink sink = sink(in, config);
-
-    // run the sink
-    sink.start();
-    sink.process();
-    sink.stop();
-
-    Assert.assertEquals(
-        Sets.newHashSet(expected),
-        read(Datasets.load(FILE_DATASET_URI)));
-    Assert.assertEquals("Should have committed", 0, remaining(in));
-  }
-
-  @Test
-  public void testDatasetUriOverridesOldConfig() throws EventDeliveryException 
{
-    // CONFIG_KITE_DATASET_URI is still set, otherwise this will cause an error
-    config.put(DatasetSinkConstants.CONFIG_KITE_REPO_URI, "bad uri");
-    config.put(DatasetSinkConstants.CONFIG_KITE_DATASET_NAME, "");
-
-    DatasetSink sink = sink(in, config);
-
-    // run the sink
-    sink.start();
-    sink.process();
-    sink.stop();
-
-    Assert.assertEquals(
-        Sets.newHashSet(expected),
-        read(Datasets.load(FILE_DATASET_URI)));
-    Assert.assertEquals("Should have committed", 0, remaining(in));
-  }
-
-  @Test
-  public void testFileStore()
-      throws EventDeliveryException, NonRecoverableEventException, 
NonRecoverableEventException {
-    DatasetSink sink = sink(in, config);
-
-    // run the sink
-    sink.start();
-    sink.process();
-    sink.stop();
-
-    Assert.assertEquals(
-        Sets.newHashSet(expected),
-        read(Datasets.load(FILE_DATASET_URI)));
-    Assert.assertEquals("Should have committed", 0, remaining(in));
-  }
-
-  @Test
-  public void testParquetDataset() throws EventDeliveryException {
-    Datasets.delete(FILE_DATASET_URI);
-    Dataset<GenericRecord> created = Datasets.create(FILE_DATASET_URI,
-        new DatasetDescriptor.Builder(DESCRIPTOR)
-            .format("parquet")
-            .build());
-
-    DatasetSink sink = sink(in, config);
-
-    // run the sink
-    sink.start();
-    sink.process();
-
-    // the transaction should not commit during the call to process
-    assertThrows("Transaction should still be open", 
IllegalStateException.class,
-        new Callable() {
-          @Override
-          public Object call() throws EventDeliveryException {
-            in.getTransaction().begin();
-            return null;
-          }
-        });
-    // The records won't commit until the call to stop()
-    Assert.assertEquals("Should not have committed", 0, read(created).size());
-
-    sink.stop();
-
-    Assert.assertEquals(Sets.newHashSet(expected), read(created));
-    Assert.assertEquals("Should have committed", 0, remaining(in));
-  }
-
-  @Test
-  public void testPartitionedData() throws EventDeliveryException {
-    URI partitionedUri = 
URI.create("dataset:file:target/test_repo/partitioned");
-    try {
-      Datasets.create(partitionedUri, new DatasetDescriptor.Builder(DESCRIPTOR)
-          .partitionStrategy(new PartitionStrategy.Builder()
-              .identity("id", 10) // partition by id
-              .build())
-          .build());
-
-      config.put(DatasetSinkConstants.CONFIG_KITE_DATASET_URI,
-          partitionedUri.toString());
-      DatasetSink sink = sink(in, config);
-
-      // run the sink
-      sink.start();
-      sink.process();
-      sink.stop();
-
-      Assert.assertEquals(
-          Sets.newHashSet(expected),
-          read(Datasets.load(partitionedUri)));
-      Assert.assertEquals("Should have committed", 0, remaining(in));
-    } finally {
-      if (Datasets.exists(partitionedUri)) {
-        Datasets.delete(partitionedUri);
-      }
-    }
-  }
-
-  @Test
-  public void testStartBeforeDatasetCreated() throws EventDeliveryException {
-    // delete the dataset created by setup
-    Datasets.delete(FILE_DATASET_URI);
-
-    DatasetSink sink = sink(in, config);
-
-    // start the sink
-    sink.start();
-
-    // run the sink without a target dataset
-    try {
-      sink.process();
-      Assert.fail("Should have thrown an exception: no such dataset");
-    } catch (EventDeliveryException e) {
-      // expected
-    }
-
-    // create the target dataset
-    Datasets.create(FILE_DATASET_URI, DESCRIPTOR);
-
-    // run the sink
-    sink.process();
-    sink.stop();
-
-    Assert.assertEquals(Sets.newHashSet(expected), 
read(Datasets.load(FILE_DATASET_URI)));
-    Assert.assertEquals("Should have committed", 0, remaining(in));
-  }
-
-  @Test
-  public void testDatasetUpdate() throws EventDeliveryException {
-    // add an updated record that is missing the msg field
-    GenericRecordBuilder updatedBuilder = new 
GenericRecordBuilder(UPDATED_SCHEMA);
-    GenericData.Record updatedRecord = updatedBuilder
-        .set("id", "0")
-        .set("priority", 1)
-        .set("msg", "Priority 1 message!")
-        .build();
-
-    // make a set of the expected records with the new schema
-    Set<GenericRecord> expectedAsUpdated = Sets.newHashSet();
-    for (GenericRecord record : expected) {
-      expectedAsUpdated.add(updatedBuilder
-          .clear("priority")
-          .set("id", record.get("id"))
-          .set("msg", record.get("msg"))
-          .build());
-    }
-    expectedAsUpdated.add(updatedRecord);
-
-    DatasetSink sink = sink(in, config);
-
-    // run the sink
-    sink.start();
-    sink.process();
-
-    // update the dataset's schema
-    DatasetDescriptor updated = new DatasetDescriptor
-        .Builder(Datasets.load(FILE_DATASET_URI).getDataset().getDescriptor())
-        .schema(UPDATED_SCHEMA)
-        .build();
-    Datasets.update(FILE_DATASET_URI, updated);
-
-    // trigger a roll on the next process call to refresh the writer
-    sink.roll();
-
-    // add the record to the incoming channel and the expected list
-    putToChannel(in, event(updatedRecord, UPDATED_SCHEMA, null, false));
-
-    // process events with the updated schema
-    sink.process();
-    sink.stop();
-
-    Assert.assertEquals(expectedAsUpdated, 
read(Datasets.load(FILE_DATASET_URI)));
-    Assert.assertEquals("Should have committed", 0, remaining(in));
-  }
-
-  @Test
-  public void testMiniClusterStore() throws EventDeliveryException, 
IOException {
-    // setup a minicluster
-    MiniDFSCluster cluster = new MiniDFSCluster
-        .Builder(new Configuration())
-        .build();
-
-    FileSystem dfs = cluster.getFileSystem();
-    Configuration conf = dfs.getConf();
-
-    URI hdfsUri = URI.create(
-        "dataset:" + conf.get("fs.defaultFS") + "/tmp/repo" + DATASET_NAME);
-    try {
-      // create a repository and dataset in HDFS
-      Datasets.create(hdfsUri, DESCRIPTOR);
-
-      // update the config to use the HDFS repository
-      config.put(DatasetSinkConstants.CONFIG_KITE_DATASET_URI, 
hdfsUri.toString());
-
-      DatasetSink sink = sink(in, config);
-
-      // run the sink
-      sink.start();
-      sink.process();
-      sink.stop();
-
-      Assert.assertEquals(
-          Sets.newHashSet(expected),
-          read(Datasets.load(hdfsUri)));
-      Assert.assertEquals("Should have committed", 0, remaining(in));
-
-    } finally {
-      if (Datasets.exists(hdfsUri)) {
-        Datasets.delete(hdfsUri);
-      }
-      cluster.shutdown();
-    }
-  }
-
-  @Test
-  public void testBatchSize() throws EventDeliveryException {
-    DatasetSink sink = sink(in, config);
-
-    // release one record per process call
-    config.put("kite.batchSize", "2");
-    Configurables.configure(sink, config);
-
-    sink.start();
-    sink.process(); // process the first and second
-    sink.roll(); // roll at the next process call
-    sink.process(); // roll and process the third
-    Assert.assertEquals(
-        Sets.newHashSet(expected.subList(0, 2)),
-        read(Datasets.load(FILE_DATASET_URI)));
-    Assert.assertEquals("Should have committed", 0, remaining(in));
-    sink.roll(); // roll at the next process call
-    sink.process(); // roll, the channel is empty
-    Assert.assertEquals(
-        Sets.newHashSet(expected),
-        read(Datasets.load(FILE_DATASET_URI)));
-    sink.stop();
-  }
-
-  @Test
-  public void testTimedFileRolling()
-      throws EventDeliveryException, InterruptedException {
-    // use a new roll interval
-    config.put("kite.rollInterval", "1"); // in seconds
-
-    DatasetSink sink = sink(in, config);
-
-    Dataset<GenericRecord> records = Datasets.load(FILE_DATASET_URI);
-
-    // run the sink
-    sink.start();
-    sink.process();
-
-    Assert.assertEquals("Should have committed", 0, remaining(in));
-
-    Thread.sleep(1100); // sleep longer than the roll interval
-    sink.process(); // rolling happens in the process method
-
-    Assert.assertEquals(Sets.newHashSet(expected), read(records));
-
-    // wait until the end to stop because it would close the files
-    sink.stop();
-  }
-
-  @Test
-  public void testCompatibleSchemas() throws EventDeliveryException {
-    DatasetSink sink = sink(in, config);
-
-    // add a compatible record that is missing the msg field
-    GenericRecordBuilder compatBuilder = new GenericRecordBuilder(
-        COMPATIBLE_SCHEMA);
-    GenericData.Record compatibleRecord = compatBuilder.set("id", "0").build();
-
-    // add the record to the incoming channel
-    putToChannel(in, event(compatibleRecord, COMPATIBLE_SCHEMA, null, false));
-
-    // the record will be read using the real schema, so create the expected
-    // record using it, but without any data
-
-    GenericRecordBuilder builder = new GenericRecordBuilder(RECORD_SCHEMA);
-    GenericData.Record expectedRecord = builder.set("id", "0").build();
-    expected.add(expectedRecord);
-
-    // run the sink
-    sink.start();
-    sink.process();
-    sink.stop();
-
-    Assert.assertEquals(
-        Sets.newHashSet(expected),
-        read(Datasets.load(FILE_DATASET_URI)));
-    Assert.assertEquals("Should have committed", 0, remaining(in));
-  }
-
-  @Test
-  public void testIncompatibleSchemas() throws EventDeliveryException {
-    final DatasetSink sink = sink(in, config);
-
-    GenericRecordBuilder builder = new GenericRecordBuilder(
-        INCOMPATIBLE_SCHEMA);
-    GenericData.Record rec = builder.set("username", "koala").build();
-    putToChannel(in, event(rec, INCOMPATIBLE_SCHEMA, null, false));
-
-    // run the sink
-    sink.start();
-    assertThrows("Should fail", EventDeliveryException.class,
-        new Callable() {
-          @Override
-          public Object call() throws EventDeliveryException {
-            sink.process();
-            return null;
-          }
-        });
-    sink.stop();
-
-    Assert.assertEquals("Should have rolled back",
-        expected.size() + 1, remaining(in));
-  }
-
-  @Test
-  public void testMissingSchema() throws EventDeliveryException {
-    final DatasetSink sink = sink(in, config);
-
-    Event badEvent = new SimpleEvent();
-    badEvent.setHeaders(Maps.<String, String>newHashMap());
-    badEvent.setBody(serialize(expected.get(0), RECORD_SCHEMA));
-    putToChannel(in, badEvent);
-
-    // run the sink
-    sink.start();
-    assertThrows("Should fail", EventDeliveryException.class,
-        new Callable() {
-          @Override
-          public Object call() throws EventDeliveryException {
-            sink.process();
-            return null;
-          }
-        });
-    sink.stop();
-
-    Assert.assertEquals("Should have rolled back",
-        expected.size() + 1, remaining(in));
-  }
-
-  @Test
-  public void testFileStoreWithSavePolicy() throws EventDeliveryException {
-    if (Datasets.exists(ERROR_DATASET_URI)) {
-      Datasets.delete(ERROR_DATASET_URI);
-    }
-    config.put(DatasetSinkConstants.CONFIG_FAILURE_POLICY,
-        DatasetSinkConstants.SAVE_FAILURE_POLICY);
-    config.put(DatasetSinkConstants.CONFIG_KITE_ERROR_DATASET_URI,
-        ERROR_DATASET_URI);
-    DatasetSink sink = sink(in, config);
-
-    // run the sink
-    sink.start();
-    sink.process();
-    sink.stop();
-
-    Assert.assertEquals(
-        Sets.newHashSet(expected),
-        read(Datasets.load(FILE_DATASET_URI)));
-    Assert.assertEquals("Should have committed", 0, remaining(in));
-  }
-
-  @Test
-  public void testMissingSchemaWithSavePolicy() throws EventDeliveryException {
-    if (Datasets.exists(ERROR_DATASET_URI)) {
-      Datasets.delete(ERROR_DATASET_URI);
-    }
-    config.put(DatasetSinkConstants.CONFIG_FAILURE_POLICY,
-        DatasetSinkConstants.SAVE_FAILURE_POLICY);
-    config.put(DatasetSinkConstants.CONFIG_KITE_ERROR_DATASET_URI,
-        ERROR_DATASET_URI);
-    final DatasetSink sink = sink(in, config);
-
-    Event badEvent = new SimpleEvent();
-    badEvent.setHeaders(Maps.<String, String>newHashMap());
-    badEvent.setBody(serialize(expected.get(0), RECORD_SCHEMA));
-    putToChannel(in, badEvent);
-
-    // run the sink
-    sink.start();
-    sink.process();
-    sink.stop();
-
-    Assert.assertEquals("Good records should have been written",
-        Sets.newHashSet(expected),
-        read(Datasets.load(FILE_DATASET_URI)));
-    Assert.assertEquals("Should not have rolled back", 0, remaining(in));
-    Assert.assertEquals("Should have saved the bad event",
-        Sets.newHashSet(AvroFlumeEvent.newBuilder()
-          .setBody(ByteBuffer.wrap(badEvent.getBody()))
-          .setHeaders(toUtf8Map(badEvent.getHeaders()))
-          .build()),
-        read(Datasets.load(ERROR_DATASET_URI, AvroFlumeEvent.class)));
-  }
-
-  @Test
-  public void testSerializedWithIncompatibleSchemasWithSavePolicy()
-      throws EventDeliveryException {
-    if (Datasets.exists(ERROR_DATASET_URI)) {
-      Datasets.delete(ERROR_DATASET_URI);
-    }
-    config.put(DatasetSinkConstants.CONFIG_FAILURE_POLICY,
-        DatasetSinkConstants.SAVE_FAILURE_POLICY);
-    config.put(DatasetSinkConstants.CONFIG_KITE_ERROR_DATASET_URI,
-        ERROR_DATASET_URI);
-    final DatasetSink sink = sink(in, config);
-
-    GenericRecordBuilder builder = new GenericRecordBuilder(
-        INCOMPATIBLE_SCHEMA);
-    GenericData.Record rec = builder.set("username", "koala").build();
-
-    // We pass in a valid schema in the header, but an incompatible schema
-    // was used to serialize the record
-    Event badEvent = event(rec, INCOMPATIBLE_SCHEMA, SCHEMA_FILE, true);
-    putToChannel(in, badEvent);
-
-    // run the sink
-    sink.start();
-    sink.process();
-    sink.stop();
-
-    Assert.assertEquals("Good records should have been written",
-        Sets.newHashSet(expected),
-        read(Datasets.load(FILE_DATASET_URI)));
-    Assert.assertEquals("Should not have rolled back", 0, remaining(in));
-    Assert.assertEquals("Should have saved the bad event",
-        Sets.newHashSet(AvroFlumeEvent.newBuilder()
-          .setBody(ByteBuffer.wrap(badEvent.getBody()))
-          .setHeaders(toUtf8Map(badEvent.getHeaders()))
-          .build()),
-        read(Datasets.load(ERROR_DATASET_URI, AvroFlumeEvent.class)));
-  }
-
-  @Test
-  public void testSerializedWithIncompatibleSchemas() throws 
EventDeliveryException {
-    final DatasetSink sink = sink(in, config);
-
-    GenericRecordBuilder builder = new GenericRecordBuilder(
-        INCOMPATIBLE_SCHEMA);
-    GenericData.Record rec = builder.set("username", "koala").build();
-
-    // We pass in a valid schema in the header, but an incompatible schema
-    // was used to serialize the record
-    putToChannel(in, event(rec, INCOMPATIBLE_SCHEMA, SCHEMA_FILE, true));
-
-    // run the sink
-    sink.start();
-    assertThrows("Should fail", EventDeliveryException.class,
-        new Callable() {
-          @Override
-          public Object call() throws EventDeliveryException {
-            sink.process();
-            return null;
-          }
-        });
-    sink.stop();
-
-    Assert.assertEquals("Should have rolled back",
-        expected.size() + 1, remaining(in));
-  }
-
-  @Test
-  public void testCommitOnBatch() throws EventDeliveryException {
-    DatasetSink sink = sink(in, config);
-
-    // run the sink
-    sink.start();
-    sink.process();
-
-    // the transaction should commit during the call to process
-    Assert.assertEquals("Should have committed", 0, remaining(in));
-    // but the data won't be visible yet
-    Assert.assertEquals(0,
-        read(Datasets.load(FILE_DATASET_URI)).size());
-
-    sink.stop();
-
-    Assert.assertEquals(
-        Sets.newHashSet(expected),
-        read(Datasets.load(FILE_DATASET_URI)));
-  }
-
-  @Test
-  public void testCommitOnBatchFalse() throws EventDeliveryException {
-    config.put(DatasetSinkConstants.CONFIG_FLUSHABLE_COMMIT_ON_BATCH,
-        Boolean.toString(false));
-    config.put(DatasetSinkConstants.CONFIG_SYNCABLE_SYNC_ON_BATCH,
-        Boolean.toString(false));
-    DatasetSink sink = sink(in, config);
-
-    // run the sink
-    sink.start();
-    sink.process();
-
-    // the transaction should not commit during the call to process
-    assertThrows("Transaction should still be open", 
IllegalStateException.class,
-        new Callable() {
-          @Override
-          public Object call() throws EventDeliveryException {
-            in.getTransaction().begin();
-            return null;
-          }
-        });
-
-    // the data won't be visible
-    Assert.assertEquals(0,
-        read(Datasets.load(FILE_DATASET_URI)).size());
-
-    sink.stop();
-
-    Assert.assertEquals(
-        Sets.newHashSet(expected),
-        read(Datasets.load(FILE_DATASET_URI)));
-    // the transaction should commit during the call to stop
-    Assert.assertEquals("Should have committed", 0, remaining(in));
-  }
-
-  @Test
-  public void testCommitOnBatchFalseSyncOnBatchTrue() throws 
EventDeliveryException {
-    config.put(DatasetSinkConstants.CONFIG_FLUSHABLE_COMMIT_ON_BATCH,
-        Boolean.toString(false));
-    config.put(DatasetSinkConstants.CONFIG_SYNCABLE_SYNC_ON_BATCH,
-        Boolean.toString(true));
-
-    try {
-      sink(in, config);
-      Assert.fail("Should have thrown IllegalArgumentException");
-    } catch (IllegalArgumentException ex) {
-      // expected
-    }
-  }
-
-  @Test
-  public void testCloseAndCreateWriter() throws EventDeliveryException {
-    config.put(DatasetSinkConstants.CONFIG_FLUSHABLE_COMMIT_ON_BATCH,
-        Boolean.toString(false));
-    config.put(DatasetSinkConstants.CONFIG_SYNCABLE_SYNC_ON_BATCH,
-        Boolean.toString(false));
-    DatasetSink sink = sink(in, config);
-
-    // run the sink
-    sink.start();
-    sink.process();
-
-    sink.closeWriter();
-    sink.commitTransaction();
-    sink.createWriter();
-
-    Assert.assertNotNull("Writer should not be null", sink.getWriter());
-    Assert.assertEquals("Should have committed", 0, remaining(in));
-
-    sink.stop();
-
-    Assert.assertEquals(
-        Sets.newHashSet(expected),
-        read(Datasets.load(FILE_DATASET_URI)));
-  }
-
-  @Test
-  public void testCloseWriter() throws EventDeliveryException {
-    config.put(DatasetSinkConstants.CONFIG_FLUSHABLE_COMMIT_ON_BATCH,
-        Boolean.toString(false));
-    config.put(DatasetSinkConstants.CONFIG_SYNCABLE_SYNC_ON_BATCH,
-        Boolean.toString(false));
-    DatasetSink sink = sink(in, config);
-
-    // run the sink
-    sink.start();
-    sink.process();
-
-    sink.closeWriter();
-    sink.commitTransaction();
-
-    Assert.assertNull("Writer should be null", sink.getWriter());
-    Assert.assertEquals("Should have committed", 0, remaining(in));
-
-    sink.stop();
-
-    Assert.assertEquals(
-        Sets.newHashSet(expected),
-        read(Datasets.load(FILE_DATASET_URI)));
-  }
-
-  @Test
-  public void testCreateWriter() throws EventDeliveryException {
-    config.put(DatasetSinkConstants.CONFIG_FLUSHABLE_COMMIT_ON_BATCH,
-        Boolean.toString(false));
-    config.put(DatasetSinkConstants.CONFIG_SYNCABLE_SYNC_ON_BATCH,
-        Boolean.toString(false));
-    DatasetSink sink = sink(in, config);
-
-    // run the sink
-    sink.start();
-    sink.process();
-
-    sink.commitTransaction();
-    sink.createWriter();
-    Assert.assertNotNull("Writer should not be null", sink.getWriter());
-    Assert.assertEquals("Should have committed", 0, remaining(in));
-
-    sink.stop();
-
-    Assert.assertEquals(0, read(Datasets.load(FILE_DATASET_URI)).size());
-  }
-
-  @Test
-  public void testAppendWriteExceptionInvokesPolicy()
-      throws EventDeliveryException, NonRecoverableEventException {
-    DatasetSink sink = sink(in, config);
-
-    // run the sink
-    sink.start();
-    sink.process();
-
-    // Mock an Event
-    Event mockEvent = mock(Event.class);
-    when(mockEvent.getBody()).thenReturn(new byte[] { 0x01 });
-
-    // Mock a GenericRecord
-    GenericRecord mockRecord = mock(GenericRecord.class);
-
-    // Mock an EntityParser
-    EntityParser<GenericRecord> mockParser = mock(EntityParser.class);
-    when(mockParser.parse(eq(mockEvent), any(GenericRecord.class)))
-        .thenReturn(mockRecord);
-    sink.setParser(mockParser);
-
-    // Mock a FailurePolicy
-    FailurePolicy mockFailurePolicy = mock(FailurePolicy.class);
-    sink.setFailurePolicy(mockFailurePolicy);
-
-    // Mock a DatasetWriter
-    DatasetWriter<GenericRecord> mockWriter = mock(DatasetWriter.class);
-    doThrow(new DataFileWriter.AppendWriteException(new IOException()))
-        .when(mockWriter).write(mockRecord);
-
-    sink.setWriter(mockWriter);
-    sink.write(mockEvent);
-
-    // Verify that the event was sent to the failure policy
-    verify(mockFailurePolicy).handle(eq(mockEvent), any(Throwable.class));
-
-    sink.stop();
-  }
-
-  @Test
-  public void testRuntimeExceptionThrowsEventDeliveryException()
-      throws EventDeliveryException, NonRecoverableEventException {
-    DatasetSink sink = sink(in, config);
-
-    // run the sink
-    sink.start();
-    sink.process();
-
-    // Mock an Event
-    Event mockEvent = mock(Event.class);
-    when(mockEvent.getBody()).thenReturn(new byte[] { 0x01 });
-
-    // Mock a GenericRecord
-    GenericRecord mockRecord = mock(GenericRecord.class);
-
-    // Mock an EntityParser
-    EntityParser<GenericRecord> mockParser = mock(EntityParser.class);
-    when(mockParser.parse(eq(mockEvent), any(GenericRecord.class)))
-        .thenReturn(mockRecord);
-    sink.setParser(mockParser);
-
-    // Mock a FailurePolicy
-    FailurePolicy mockFailurePolicy = mock(FailurePolicy.class);
-    sink.setFailurePolicy(mockFailurePolicy);
-
-    // Mock a DatasetWriter
-    DatasetWriter<GenericRecord> mockWriter = mock(DatasetWriter.class);
-    doThrow(new RuntimeException()).when(mockWriter).write(mockRecord);
-
-    sink.setWriter(mockWriter);
-
-    try {
-      sink.write(mockEvent);
-      Assert.fail("Should throw EventDeliveryException");
-    } catch (EventDeliveryException ex) {
-
-    }
-
-    // Verify that the event was not sent to the failure policy
-    verify(mockFailurePolicy, never()).handle(eq(mockEvent), 
any(Throwable.class));
-
-    sink.stop();
-  }
-
-  @Test
-  public void testProcessHandlesNullWriter() throws EventDeliveryException,
-      NonRecoverableEventException, NonRecoverableEventException {
-    DatasetSink sink = sink(in, config);
-
-    // run the sink
-    sink.start();
-    sink.process();
-
-    // explicitly set the writer to null
-    sink.setWriter(null);
-
-    // this should not throw an NPE
-    sink.process();
-
-    sink.stop();
-
-    Assert.assertEquals("Should have committed", 0, remaining(in));
-  }
-
-  public static DatasetSink sink(Channel in, Context config) {
-    DatasetSink sink = new DatasetSink();
-    sink.setChannel(in);
-    Configurables.configure(sink, config);
-    return sink;
-  }
-
-  public static <T> HashSet<T> read(View<T> view) {
-    DatasetReader<T> reader = null;
-    try {
-      reader = view.newReader();
-      return Sets.newHashSet(reader.iterator());
-    } finally {
-      if (reader != null) {
-        reader.close();
-      }
-    }
-  }
-
-  public static int remaining(Channel ch) throws EventDeliveryException {
-    Transaction t = ch.getTransaction();
-    try {
-      t.begin();
-      int count = 0;
-      while (ch.take() != null) {
-        count += 1;
-      }
-      t.commit();
-      return count;
-    } catch (Throwable th) {
-      t.rollback();
-      Throwables.propagateIfInstanceOf(th, Error.class);
-      Throwables.propagateIfInstanceOf(th, EventDeliveryException.class);
-      throw new EventDeliveryException(th);
-    } finally {
-      t.close();
-    }
-  }
-
-  public static void putToChannel(Channel in, Event... records)
-      throws EventDeliveryException {
-    putToChannel(in, Arrays.asList(records));
-  }
-
-  public static void putToChannel(Channel in, Iterable<Event> records)
-      throws EventDeliveryException {
-    Transaction t = in.getTransaction();
-    try {
-      t.begin();
-      for (Event record : records) {
-        in.put(record);
-      }
-      t.commit();
-    } catch (Throwable th) {
-      t.rollback();
-      Throwables.propagateIfInstanceOf(th, Error.class);
-      Throwables.propagateIfInstanceOf(th, EventDeliveryException.class);
-      throw new EventDeliveryException(th);
-    } finally {
-      t.close();
-    }
-  }
-
-  public static Event event(
-      Object datum, Schema schema, File file, boolean useURI) {
-    Map<String, String> headers = Maps.newHashMap();
-    if (useURI) {
-      headers.put(DatasetSinkConstants.AVRO_SCHEMA_URL_HEADER,
-          file.getAbsoluteFile().toURI().toString());
-    } else {
-      headers.put(DatasetSinkConstants.AVRO_SCHEMA_LITERAL_HEADER,
-          schema.toString());
-    }
-    Event e = new SimpleEvent();
-    e.setBody(serialize(datum, schema));
-    e.setHeaders(headers);
-    return e;
-  }
-
-  @SuppressWarnings("unchecked")
-  public static byte[] serialize(Object datum, Schema schema) {
-    ByteArrayOutputStream out = new ByteArrayOutputStream();
-    Encoder encoder = EncoderFactory.get().binaryEncoder(out, null);
-    ReflectDatumWriter writer = new ReflectDatumWriter(schema);
-    try {
-      writer.write(datum, encoder);
-      encoder.flush();
-    } catch (IOException ex) {
-      Throwables.propagate(ex);
-    }
-    return out.toByteArray();
-  }
-
-  /**
-   * A convenience method to avoid a large number of @Test(expected=...) tests.
-   *
-   * This variant uses a Callable, which is allowed to throw checked 
Exceptions.
-   *
-   * @param message A String message to describe this assertion
-   * @param expected An Exception class that the Runnable should throw
-   * @param callable A Callable that is expected to throw the exception
-   */
-  public static void assertThrows(
-      String message, Class<? extends Exception> expected, Callable callable) {
-    try {
-      callable.call();
-      Assert.fail("No exception was thrown (" + message + "), expected: " +
-          expected.getName());
-    } catch (Exception actual) {
-      Assert.assertEquals(message, expected, actual.getClass());
-    }
-  }
-
-  /**
-   * Helper function to convert a map of String to a map of Utf8.
-   *
-   * @param map A Map of String to String
-   * @return The same mappings converting the {@code String}s to {@link Utf8}s
-   */
-  public static Map<CharSequence, CharSequence> toUtf8Map(
-      Map<String, String> map) {
-    Map<CharSequence, CharSequence> utf8Map = Maps.newHashMap();
-    for (Map.Entry<String, String> entry : map.entrySet()) {
-      utf8Map.put(new Utf8(entry.getKey()), new Utf8(entry.getValue()));
-    }
-    return utf8Map;
-  }
-}
diff --git 
a/flume-ng-sinks/flume-dataset-sink/src/test/resources/enable-kerberos.xml 
b/flume-ng-sinks/flume-dataset-sink/src/test/resources/enable-kerberos.xml
deleted file mode 100644
index 85b0447..0000000
--- a/flume-ng-sinks/flume-dataset-sink/src/test/resources/enable-kerberos.xml
+++ /dev/null
@@ -1,30 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-Copyright 2014 Apache Software Foundation.
-
-Licensed under the Apache License, Version 2.0 (the "License");
-you may not use this file except in compliance with the License.
-You may obtain a copy of the License at
-
-     http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing, software
-distributed under the License is distributed on an "AS IS" BASIS,
-WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-See the License for the specific language governing permissions and
-limitations under the License.
--->
-
-<configuration>
-
-  <property>
-    <name>hadoop.security.authentication</name>
-    <value>kerberos</value>
-  </property>
-
-  <property>
-    <name>hadoop.security.authorization</name>
-    <value>true</value>
-  </property>
-
-</configuration>
diff --git a/flume-ng-sinks/flume-ng-hbase-sink/pom.xml 
b/flume-ng-sinks/flume-ng-hbase-sink/pom.xml
index 5f66e76..8e8ea83 100644
--- a/flume-ng-sinks/flume-ng-hbase-sink/pom.xml
+++ b/flume-ng-sinks/flume-ng-hbase-sink/pom.xml
@@ -60,11 +60,6 @@
     </dependency>
 
     <dependency>
-      <groupId>io.netty</groupId>
-      <artifactId>netty</artifactId>
-    </dependency>
-
-    <dependency>
       <groupId>org.apache.logging.log4j</groupId>
       <artifactId>log4j-slf4j-impl</artifactId>
       <scope>test</scope>
diff --git a/flume-ng-sinks/flume-ng-hbase2-sink/pom.xml 
b/flume-ng-sinks/flume-ng-hbase2-sink/pom.xml
index ea6a93c..0a445a6 100644
--- a/flume-ng-sinks/flume-ng-hbase2-sink/pom.xml
+++ b/flume-ng-sinks/flume-ng-hbase2-sink/pom.xml
@@ -156,11 +156,6 @@
     </dependency>
 
     <dependency>
-      <groupId>io.netty</groupId>
-      <artifactId>netty</artifactId>
-    </dependency>
-
-    <dependency>
       <groupId>org.apache.logging.log4j</groupId>
       <artifactId>log4j-slf4j-impl</artifactId>
       <scope>test</scope>
diff --git 
a/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestKafkaSink.java
 
b/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestKafkaSink.java
index cb8a038..97dc0bd 100644
--- 
a/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestKafkaSink.java
+++ 
b/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestKafkaSink.java
@@ -55,6 +55,7 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
@@ -183,8 +184,15 @@ public class TestKafkaSink {
     ConsumerRecords recs = pollConsumerRecords(topic);
     assertNotNull(recs);
     assertTrue(recs.count() > 0);
-    ConsumerRecord consumerRecord = (ConsumerRecord) 
recs.records(topic).iterator().next();
-    assertEquals(msg, consumerRecord.value());
+    Iterator<ConsumerRecord> iter = recs.records(topic).iterator();
+    boolean match = false;
+    while (iter.hasNext()) {
+      if (msg.equals(iter.next().value())) {
+        match = true;
+        break;
+      }
+    }
+    assertTrue("No message matches " + msg, match);
   }
 
   @Test
diff --git 
a/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/util/TestUtil.java
 
b/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/util/TestUtil.java
index cdf9bad..1a87dc5 100644
--- 
a/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/util/TestUtil.java
+++ 
b/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/util/TestUtil.java
@@ -19,7 +19,7 @@
 package org.apache.flume.sink.kafka.util;
 
 import org.apache.kafka.clients.admin.AdminClient;
-import org.apache.kafka.clients.admin.DescribeTopicsResult;
+import org.apache.kafka.clients.admin.CreateTopicsResult;
 import org.apache.kafka.clients.admin.NewTopic;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
@@ -165,14 +165,19 @@ public class TestUtil {
       NewTopic newTopic = new NewTopic(topicName, numPartitions, (short) 1);
       newTopics.add(newTopic);
     }
-    getAdminClient().createTopics(newTopics);
-
-    //the following lines are a bit of black magic to ensure the topic is 
ready when we return
-    DescribeTopicsResult dtr = getAdminClient().describeTopics(topicNames);
-    try {
-      dtr.all().get(10, TimeUnit.SECONDS);
-    } catch (Exception e) {
-      throw new RuntimeException("Error getting topic info", e);
+    CreateTopicsResult result = getAdminClient().createTopics(newTopics);
+    Throwable throwable = null;
+    for (int i = 0; i < 10; ++i) {
+      try {
+        result.all().get(1, TimeUnit.SECONDS);
+        throwable = null;
+        break;
+      } catch (Exception e) {
+        throwable = e;
+      }
+    }
+    if (throwable != null) {
+      throw new RuntimeException("Error getting topic info", throwable);
     }
   }
   public void deleteTopic(String topicName) {
diff --git a/flume-ng-sinks/pom.xml b/flume-ng-sinks/pom.xml
index ae9e306..2ecc140 100644
--- a/flume-ng-sinks/pom.xml
+++ b/flume-ng-sinks/pom.xml
@@ -43,7 +43,6 @@ limitations under the License.
     <module>flume-ng-morphline-solr-sink</module>
     <module>flume-ng-kafka-sink</module>
     <module>flume-http-sink</module>
-    <module>flume-dataset-sink</module>
     <module>flume-hive-sink</module>
   </modules>
 
diff --git 
a/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/KafkaSourceEmbeddedKafka.java
 
b/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/KafkaSourceEmbeddedKafka.java
index 56a582a..0799664 100644
--- 
a/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/KafkaSourceEmbeddedKafka.java
+++ 
b/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/KafkaSourceEmbeddedKafka.java
@@ -20,7 +20,7 @@ import kafka.server.KafkaConfig;
 import kafka.server.KafkaServerStartable;
 import org.apache.commons.io.FileUtils;
 import org.apache.kafka.clients.admin.AdminClient;
-import org.apache.kafka.clients.admin.DescribeTopicsResult;
+import org.apache.kafka.clients.admin.CreateTopicsResult;
 import org.apache.kafka.clients.admin.NewTopic;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.producer.KafkaProducer;
@@ -141,14 +141,19 @@ public class KafkaSourceEmbeddedKafka {
   public void createTopic(String topicName, int numPartitions) {
     AdminClient adminClient = getAdminClient();
     NewTopic newTopic = new NewTopic(topicName, numPartitions, (short) 1);
-    adminClient.createTopics(Collections.singletonList(newTopic));
-
-    //the following lines are a bit of black magic to ensure the topic is 
ready when we return
-    DescribeTopicsResult dtr = 
adminClient.describeTopics(Collections.singletonList(topicName));
-    try {
-      dtr.all().get(10, TimeUnit.SECONDS);
-    } catch (Exception e) {
-      throw new RuntimeException("Error getting topic info", e);
+    CreateTopicsResult result = 
adminClient.createTopics(Collections.singletonList(newTopic));
+    Throwable throwable = null;
+    for (int i = 0; i < 10; ++i) {
+      try {
+        result.all().get(1, TimeUnit.SECONDS);
+        throwable = null;
+        break;
+      } catch (Exception e) {
+        throwable = e;
+      }
+    }
+    if (throwable != null) {
+      throw new RuntimeException("Error getting topic info", throwable);
     }
   }
 
diff --git a/pom.xml b/pom.xml
index 091d121..fa228bd 100644
--- a/pom.xml
+++ b/pom.xml
@@ -48,7 +48,7 @@ limitations under the License.
 
     <activemq.version>5.7.0</activemq.version>
     <asynchbase.version>1.7.0</asynchbase.version>
-    <avro.version>1.7.7</avro.version>
+    <avro.version>1.9.2</avro.version>
     <bundle-plugin.version>2.3.7</bundle-plugin.version>
     <checkstyle.tool.version>8.45.1</checkstyle.tool.version>
     <codehaus.jackson.version>1.9.13</codehaus.jackson.version>
@@ -506,7 +506,7 @@ limitations under the License.
           <configuration>
             <reuseForks>false</reuseForks>
             <forkCount>1</forkCount>
-            <rerunFailingTestsCount>10</rerunFailingTestsCount>
+            <!--<rerunFailingTestsCount>10</rerunFailingTestsCount>-->
             <forkedProcessTimeoutInSeconds>900</forkedProcessTimeoutInSeconds>
             <redirectTestOutputToFile>true</redirectTestOutputToFile>
             <argLine>-Djava.net.preferIPv4Stack=true</argLine>
@@ -827,14 +827,10 @@ limitations under the License.
 
       <dependency>
         <groupId>org.apache.avro</groupId>
-        <artifactId>avro-ipc</artifactId>
+        <artifactId>avro-ipc-netty</artifactId>
         <version>${avro.version}</version>
         <exclusions>
           <exclusion>
-            <groupId>org.mortbay.jetty</groupId>
-            <artifactId>servlet-api</artifactId>
-          </exclusion>
-          <exclusion>
             <groupId>io.netty</groupId>
             <artifactId>netty</artifactId>
           </exclusion>
@@ -842,6 +838,18 @@ limitations under the License.
       </dependency>
 
       <dependency>
+        <groupId>org.apache.avro</groupId>
+        <artifactId>avro-ipc-jetty</artifactId>
+        <version>${avro.version}</version>
+        <exclusions>
+          <exclusion>
+            <groupId>org.mortbay.jetty</groupId>
+            <artifactId>servlet-api</artifactId>
+          </exclusion>
+        </exclusions>
+      </dependency>
+
+      <dependency>
         <groupId>org.apache.thrift</groupId>
         <artifactId>libthrift</artifactId>
         <version>${thrift.version}</version>
@@ -1363,53 +1371,6 @@ limitations under the License.
         <artifactId>scala-library</artifactId>
         <version>${scala-library.version}</version>
       </dependency>
-      <dependency>
-        <groupId>org.kitesdk</groupId>
-        <artifactId>kite-data-core</artifactId>
-        <version>${kite.version}</version>
-      </dependency>
-      <dependency>
-        <groupId>org.kitesdk</groupId>
-        <artifactId>kite-data-hive</artifactId>
-        <version>${kite.version}</version>
-      </dependency>
-      <dependency>
-        <groupId>org.kitesdk</groupId>
-        <artifactId>kite-data-hbase</artifactId>
-        <version>${kite.version}</version>
-      </dependency>
-
-      <!-- Dependency for kite-data-hive -->
-      <dependency>
-        <groupId>org.apache.hive</groupId>
-        <artifactId>hive-exec</artifactId>
-        <version>${hive.version}</version>
-        <exclusions>
-          <exclusion>
-            <groupId>log4j</groupId>
-            <artifactId>log4j</artifactId>
-          </exclusion>
-          <exclusion>
-            <groupId>org.slf4j</groupId>
-            <artifactId>slf4j-log4j12</artifactId>
-          </exclusion>
-        </exclusions>
-      </dependency>
-      <dependency>
-        <groupId>org.apache.hive</groupId>
-        <artifactId>hive-metastore</artifactId>
-        <version>${hive.version}</version>
-        <exclusions>
-          <exclusion>
-            <groupId>log4j</groupId>
-            <artifactId>log4j</artifactId>
-          </exclusion>
-          <exclusion>
-            <groupId>org.slf4j</groupId>
-            <artifactId>slf4j-log4j12</artifactId>
-          </exclusion>
-        </exclusions>
-      </dependency>
 
       <dependency>
         <groupId>org.xerial.snappy</groupId>

Reply via email to