This is an automated email from the ASF dual-hosted git repository.
rgoers pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/flume.git
The following commit(s) were added to refs/heads/trunk by this push:
new a51d7a0 FLUME-3356 Upgrade Avro to 1.9.2
a51d7a0 is described below
commit a51d7a05e14381bcde73705d8c51df4bfd1919dc
Author: Ralph Goers <[email protected]>
AuthorDate: Wed Jan 12 16:10:23 2022 -0700
FLUME-3356 Upgrade Avro to 1.9.2
---
conf/log4j2.xml | 2 +-
flume-ng-core/pom.xml | 9 +-
.../java/org/apache/flume/source/AvroSource.java | 4 +-
.../java/org/apache/flume/sink/TestAvroSink.java | 46 +-
.../org/apache/flume/source/TestAvroSource.java | 9 +-
.../flume/agent/embedded/TestEmbeddedAgent.java | 8 +-
flume-ng-legacy-sources/flume-avro-source/pom.xml | 10 +-
.../flume/source/avroLegacy/AvroLegacySource.java | 12 +-
flume-ng-sdk/pom.xml | 11 +-
.../org/apache/flume/api/NettyAvroRpcClient.java | 11 +-
.../java/org/apache/flume/api/RpcTestUtils.java | 50 +-
flume-ng-sinks/flume-dataset-sink/pom.xml | 132 ---
.../org/apache/flume/sink/kite/DatasetSink.java | 588 -----------
.../flume/sink/kite/DatasetSinkConstants.java | 132 ---
.../sink/kite/NonRecoverableEventException.java | 52 -
.../apache/flume/sink/kite/parser/AvroParser.java | 208 ----
.../flume/sink/kite/parser/EntityParser.java | 56 --
.../sink/kite/parser/EntityParserFactory.java | 81 --
.../flume/sink/kite/policy/FailurePolicy.java | 105 --
.../sink/kite/policy/FailurePolicyFactory.java | 81 --
.../apache/flume/sink/kite/policy/RetryPolicy.java | 63 --
.../apache/flume/sink/kite/policy/SavePolicy.java | 128 ---
.../apache/flume/sink/kite/TestDatasetSink.java | 1036 --------------------
.../src/test/resources/enable-kerberos.xml | 30 -
flume-ng-sinks/flume-ng-hbase-sink/pom.xml | 5 -
flume-ng-sinks/flume-ng-hbase2-sink/pom.xml | 5 -
.../org/apache/flume/sink/kafka/TestKafkaSink.java | 12 +-
.../org/apache/flume/sink/kafka/util/TestUtil.java | 23 +-
flume-ng-sinks/pom.xml | 1 -
.../source/kafka/KafkaSourceEmbeddedKafka.java | 23 +-
pom.xml | 69 +-
31 files changed, 159 insertions(+), 2843 deletions(-)
diff --git a/conf/log4j2.xml b/conf/log4j2.xml
index d203a24..d3efac4 100644
--- a/conf/log4j2.xml
+++ b/conf/log4j2.xml
@@ -46,7 +46,7 @@
<Loggers>
<Logger name="org.apache.flume.lifecycle" level="info"/>
<Logger name="org.jboss" level="WARN"/>
- <Logger name="org.apache.avro.ipc.NettyTransceiver" level="WARN"/>
+ <Logger name="org.apache.avro.ipc.netty.NettyTransceiver" level="WARN"/>
<Logger name="org.apache.hadoop" level="INFO"/>
<Logger name="org.apache.hadoop.hive" level="ERROR"/>
<Root level="INFO">
diff --git a/flume-ng-core/pom.xml b/flume-ng-core/pom.xml
index b2a90db..bd81c4b 100644
--- a/flume-ng-core/pom.xml
+++ b/flume-ng-core/pom.xml
@@ -32,7 +32,7 @@ limitations under the License.
<properties>
<!-- TODO fix spotbugs violations -->
<spotbugs.maxAllowedViolations>115</spotbugs.maxAllowedViolations>
- <pmd.maxAllowedViolations>112</pmd.maxAllowedViolations>
+ <pmd.maxAllowedViolations>121</pmd.maxAllowedViolations>
</properties>
<build>
@@ -324,12 +324,7 @@ limitations under the License.
<dependency>
<groupId>org.apache.avro</groupId>
- <artifactId>avro-ipc</artifactId>
- </dependency>
-
- <dependency>
- <groupId>io.netty</groupId>
- <artifactId>netty</artifactId>
+ <artifactId>avro-ipc-netty</artifactId>
</dependency>
<dependency>
diff --git
a/flume-ng-core/src/main/java/org/apache/flume/source/AvroSource.java
b/flume-ng-core/src/main/java/org/apache/flume/source/AvroSource.java
index ac3d51d..6342aa9 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/source/AvroSource.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/source/AvroSource.java
@@ -20,8 +20,8 @@
package org.apache.flume.source;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import org.apache.avro.ipc.NettyServer;
-import org.apache.avro.ipc.NettyTransceiver;
+import org.apache.avro.ipc.netty.NettyServer;
+import org.apache.avro.ipc.netty.NettyTransceiver;
import org.apache.avro.ipc.Responder;
import org.apache.avro.ipc.Server;
import org.apache.avro.ipc.specific.SpecificResponder;
diff --git
a/flume-ng-core/src/test/java/org/apache/flume/sink/TestAvroSink.java
b/flume-ng-core/src/test/java/org/apache/flume/sink/TestAvroSink.java
index cc2c91a..bb62cff 100644
--- a/flume-ng-core/src/test/java/org/apache/flume/sink/TestAvroSink.java
+++ b/flume-ng-core/src/test/java/org/apache/flume/sink/TestAvroSink.java
@@ -20,8 +20,7 @@
package org.apache.flume.sink;
import com.google.common.base.Charsets;
-import org.apache.avro.AvroRemoteException;
-import org.apache.avro.ipc.NettyServer;
+import org.apache.avro.ipc.netty.NettyServer;
import org.apache.avro.ipc.Server;
import org.apache.avro.ipc.specific.SpecificResponder;
import org.apache.flume.Channel;
@@ -810,21 +809,20 @@ public class TestAvroSink {
private static class MockAvroServer implements AvroSourceProtocol {
@Override
- public Status append(AvroFlumeEvent event) throws AvroRemoteException {
+ public Status append(AvroFlumeEvent event) {
logger.debug("Received event:{}", event);
return Status.OK;
}
@Override
- public Status appendBatch(List<AvroFlumeEvent> events)
- throws AvroRemoteException {
+ public Status appendBatch(List<AvroFlumeEvent> events) {
logger.debug("Received event batch:{}", events);
return Status.OK;
}
}
- private static class DelayMockAvroServer implements AvroSourceProtocol {
+ private static class DelayMockAvroServer implements
AvroSourceProtocol.Callback {
private final AtomicLong delay;
@@ -832,29 +830,51 @@ public class TestAvroSink {
this.delay = delay;
}
- private void sleep() throws AvroRemoteException {
+ private void sleep() throws IOException {
try {
Thread.sleep(delay.get());
} catch (InterruptedException e) {
- throw new AvroRemoteException("Interrupted while sleeping", e);
+ throw new IOException("Interrupted while sleeping", e);
}
}
@Override
- public Status append(AvroFlumeEvent event) throws AvroRemoteException {
+ public Status append(AvroFlumeEvent event) {
logger.debug("Received event:{}; delaying for {}ms", event, delay);
- sleep();
+ try {
+ sleep();
+ } catch (IOException e) {
+ logger.debug("IOException detected");
+ }
return Status.OK;
}
@Override
- public Status appendBatch(List<AvroFlumeEvent> events)
- throws AvroRemoteException {
- logger.debug("Received event batch:{}; delaying for {}ms", events,
delay);
+ public void append(AvroFlumeEvent event,
+ org.apache.avro.ipc.Callback<Status> status)
+ throws IOException {
+ logger.debug("Received event:{}; delaying for {}ms", event, delay);
sleep();
+ }
+
+ @Override
+ public Status appendBatch(List<AvroFlumeEvent> events) {
+ logger.debug("Received event batch:{}; delaying for {}ms", events,
delay);
+ try {
+ sleep();
+ } catch (IOException e) {
+ logger.debug("IOException detected");
+ }
return Status.OK;
}
+ @Override
+ public void appendBatch(List<AvroFlumeEvent> events,
+ org.apache.avro.ipc.Callback<Status> status)
+ throws IOException {
+ logger.debug("Received event batch:{}; delaying for {}ms", events,
delay);
+ sleep();
+ }
}
private Server createSslServer(AvroSourceProtocol protocol)
diff --git
a/flume-ng-core/src/test/java/org/apache/flume/source/TestAvroSource.java
b/flume-ng-core/src/test/java/org/apache/flume/source/TestAvroSource.java
index 21e65ad..b9aab53 100644
--- a/flume-ng-core/src/test/java/org/apache/flume/source/TestAvroSource.java
+++ b/flume-ng-core/src/test/java/org/apache/flume/source/TestAvroSource.java
@@ -39,7 +39,8 @@ import javax.net.ssl.SSLEngine;
import javax.net.ssl.TrustManager;
import javax.net.ssl.X509TrustManager;
-import org.apache.avro.ipc.NettyTransceiver;
+import org.apache.avro.AvroRuntimeException;
+import org.apache.avro.ipc.netty.NettyTransceiver;
import org.apache.avro.ipc.specific.SpecificRequestor;
import org.apache.flume.Channel;
import org.apache.flume.ChannelException;
@@ -218,13 +219,13 @@ public class TestAvroSource {
doRequest(true, true, 9);
}
- @Test(expected = org.apache.avro.AvroRemoteException.class)
+ @Test(expected = org.apache.avro.AvroRuntimeException.class)
public void testRequestWithCompressionOnServerOnly() throws
InterruptedException, IOException {
//This will fail because both client and server need compression on
doRequest(true, false, 6);
}
- @Test(expected = org.apache.avro.AvroRemoteException.class)
+ @Test(expected = org.apache.avro.AvroRuntimeException.class)
public void testRequestWithCompressionOnClientOnly() throws
InterruptedException, IOException {
//This will fail because both client and server need compression on
doRequest(false, true, 6);
@@ -585,7 +586,7 @@ public class TestAvroSource {
Status status = client.append(avroEvent);
logger.info("Client appended");
Assert.assertEquals(Status.OK, status);
- } catch (IOException e) {
+ } catch (IOException | AvroRuntimeException e) {
Assert.assertTrue("Should have been allowed: " + ruleDefinition,
!eventShouldBeAllowed);
return;
diff --git
a/flume-ng-embedded-agent/src/test/java/org/apache/flume/agent/embedded/TestEmbeddedAgent.java
b/flume-ng-embedded-agent/src/test/java/org/apache/flume/agent/embedded/TestEmbeddedAgent.java
index 5bb7cf1..68a3a07 100644
---
a/flume-ng-embedded-agent/src/test/java/org/apache/flume/agent/embedded/TestEmbeddedAgent.java
+++
b/flume-ng-embedded-agent/src/test/java/org/apache/flume/agent/embedded/TestEmbeddedAgent.java
@@ -28,8 +28,7 @@ import java.util.Queue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
-import org.apache.avro.AvroRemoteException;
-import org.apache.avro.ipc.NettyServer;
+import org.apache.avro.ipc.netty.NettyServer;
import org.apache.avro.ipc.Responder;
import org.apache.avro.ipc.specific.SpecificResponder;
import org.apache.flume.Event;
@@ -204,13 +203,12 @@ public class TestEmbeddedAgent {
return null;
}
@Override
- public Status append(AvroFlumeEvent event) throws AvroRemoteException {
+ public Status append(AvroFlumeEvent event) {
eventQueue.add(event);
return Status.OK;
}
@Override
- public Status appendBatch(List<AvroFlumeEvent> events)
- throws AvroRemoteException {
+ public Status appendBatch(List<AvroFlumeEvent> events) {
Preconditions.checkState(eventQueue.addAll(events));
return Status.OK;
}
diff --git a/flume-ng-legacy-sources/flume-avro-source/pom.xml
b/flume-ng-legacy-sources/flume-avro-source/pom.xml
index b592744..9d9e9dc 100644
--- a/flume-ng-legacy-sources/flume-avro-source/pom.xml
+++ b/flume-ng-legacy-sources/flume-avro-source/pom.xml
@@ -32,7 +32,8 @@ limitations under the License.
<properties>
<!-- TODO fix spotbugs violations -->
- <spotbugs.maxAllowedViolations>2</spotbugs.maxAllowedViolations>
+ <spotbugs.maxAllowedViolations>5</spotbugs.maxAllowedViolations>
+ <pmd.maxAllowedViolations>37</pmd.maxAllowedViolations>
<thrift.executable>${env.THRIFT_HOME}/bin/thrift</thrift.executable>
</properties>
@@ -131,7 +132,12 @@ limitations under the License.
<dependency>
<groupId>org.apache.avro</groupId>
- <artifactId>avro-ipc</artifactId>
+ <artifactId>avro-ipc-netty</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.avro</groupId>
+ <artifactId>avro-ipc-jetty</artifactId>
</dependency>
</dependencies>
diff --git
a/flume-ng-legacy-sources/flume-avro-source/src/main/java/org/apache/flume/source/avroLegacy/AvroLegacySource.java
b/flume-ng-legacy-sources/flume-avro-source/src/main/java/org/apache/flume/source/avroLegacy/AvroLegacySource.java
index dde8f28..5e87193 100644
---
a/flume-ng-legacy-sources/flume-avro-source/src/main/java/org/apache/flume/source/avroLegacy/AvroLegacySource.java
+++
b/flume-ng-legacy-sources/flume-avro-source/src/main/java/org/apache/flume/source/avroLegacy/AvroLegacySource.java
@@ -25,7 +25,7 @@ import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
-import org.apache.avro.ipc.HttpServer;
+import org.apache.avro.ipc.jetty.HttpServer;
import org.apache.avro.ipc.specific.SpecificResponder;
import org.apache.flume.Context;
import org.apache.flume.CounterGroup;
@@ -38,7 +38,6 @@ import org.apache.flume.source.AbstractSource;
import com.cloudera.flume.handlers.avro.AvroFlumeOGEvent;
import com.cloudera.flume.handlers.avro.FlumeOGEventAvroServer;
-import org.apache.avro.AvroRemoteException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.flume.ChannelException;
@@ -122,15 +121,15 @@ public class AvroLegacySource extends AbstractSource
implements
}
@Override
- public Void append( AvroFlumeOGEvent evt ) throws AvroRemoteException {
+ public void append( AvroFlumeOGEvent evt ) {
counterGroup.incrementAndGet("rpc.received");
Map<String, String> headers = new HashMap<String, String>();
// extract Flume OG event headers
headers.put(HOST, evt.getHost().toString());
- headers.put(TIMESTAMP, evt.getTimestamp().toString());
+ headers.put(TIMESTAMP, Long.toString(evt.getTimestamp()));
headers.put(PRIORITY, evt.getPriority().toString());
- headers.put(NANOS, evt.getNanos().toString());
+ headers.put(NANOS, Long.toString(evt.getNanos()));
for (Entry<CharSequence, ByteBuffer> entry : evt.getFields().entrySet()) {
headers.put(entry.getKey().toString(), entry.getValue().toString());
}
@@ -141,11 +140,10 @@ public class AvroLegacySource extends AbstractSource
implements
getChannelProcessor().processEvent(event);
counterGroup.incrementAndGet("rpc.events");
} catch (ChannelException ex) {
- return null;
+ return;
}
counterGroup.incrementAndGet("rpc.successful");
- return null;
}
@Override
diff --git a/flume-ng-sdk/pom.xml b/flume-ng-sdk/pom.xml
index 0376684..85ff8a4 100644
--- a/flume-ng-sdk/pom.xml
+++ b/flume-ng-sdk/pom.xml
@@ -30,8 +30,8 @@ limitations under the License.
<properties>
<!-- TODO fix spotbugs/pmd violations -->
- <spotbugs.maxAllowedViolations>67</spotbugs.maxAllowedViolations>
- <pmd.maxAllowedViolations>131</pmd.maxAllowedViolations>
+ <spotbugs.maxAllowedViolations>69</spotbugs.maxAllowedViolations>
+ <pmd.maxAllowedViolations>170</pmd.maxAllowedViolations>
</properties>
<profiles>
@@ -227,7 +227,7 @@ limitations under the License.
<dependency>
<groupId>org.apache.avro</groupId>
- <artifactId>avro-ipc</artifactId>
+ <artifactId>avro-ipc-netty</artifactId>
</dependency>
<dependency>
@@ -240,5 +240,10 @@ limitations under the License.
<artifactId>libthrift</artifactId>
</dependency>
+ <dependency>
+ <groupId>commons-lang</groupId>
+ <artifactId>commons-lang</artifactId>
+ </dependency>
+
</dependencies>
</project>
diff --git
a/flume-ng-sdk/src/main/java/org/apache/flume/api/NettyAvroRpcClient.java
b/flume-ng-sdk/src/main/java/org/apache/flume/api/NettyAvroRpcClient.java
index cf52f6e..06d364f 100644
--- a/flume-ng-sdk/src/main/java/org/apache/flume/api/NettyAvroRpcClient.java
+++ b/flume-ng-sdk/src/main/java/org/apache/flume/api/NettyAvroRpcClient.java
@@ -56,7 +56,7 @@ import javax.net.ssl.TrustManagerFactory;
import javax.net.ssl.X509TrustManager;
import org.apache.avro.ipc.CallFuture;
-import org.apache.avro.ipc.NettyTransceiver;
+import org.apache.avro.ipc.netty.NettyTransceiver;
import org.apache.avro.ipc.Transceiver;
import org.apache.avro.ipc.specific.SpecificRequestor;
import org.apache.commons.lang.StringUtils;
@@ -704,10 +704,11 @@ public class NettyAvroRpcClient extends
SSLContextAwareAbstractRpcClient {
KeyStore keystore = null;
if (truststore != null) {
- InputStream truststoreStream = new FileInputStream(truststore);
- keystore = KeyStore.getInstance(truststoreType);
- keystore.load(truststoreStream,
- truststorePassword != null ?
truststorePassword.toCharArray() : null);
+ try (InputStream truststoreStream = new
FileInputStream(truststore)) {
+ keystore = KeyStore.getInstance(truststoreType);
+ keystore.load(truststoreStream,
+ truststorePassword != null ?
truststorePassword.toCharArray() : null);
+ }
}
TrustManagerFactory tmf =
TrustManagerFactory.getInstance("SunX509");
diff --git a/flume-ng-sdk/src/test/java/org/apache/flume/api/RpcTestUtils.java
b/flume-ng-sdk/src/test/java/org/apache/flume/api/RpcTestUtils.java
index d9355f7..6750352 100644
--- a/flume-ng-sdk/src/test/java/org/apache/flume/api/RpcTestUtils.java
+++ b/flume-ng-sdk/src/test/java/org/apache/flume/api/RpcTestUtils.java
@@ -19,8 +19,8 @@
package org.apache.flume.api;
import junit.framework.Assert;
-import org.apache.avro.AvroRemoteException;
-import org.apache.avro.ipc.NettyServer;
+
+import org.apache.avro.ipc.netty.NettyServer;
import org.apache.avro.ipc.Responder;
import org.apache.avro.ipc.Server;
import org.apache.avro.ipc.specific.SpecificResponder;
@@ -244,7 +244,7 @@ public class RpcTestUtils {
}
@Override
- public Status append(AvroFlumeEvent event) throws AvroRemoteException {
+ public Status append(AvroFlumeEvent event) {
if (failed) {
logger.debug("Event rejected");
return Status.FAILED;
@@ -256,8 +256,7 @@ public class RpcTestUtils {
}
@Override
- public Status appendBatch(List<AvroFlumeEvent> events) throws
- AvroRemoteException {
+ public Status appendBatch(List<AvroFlumeEvent> events) {
if (failed) {
logger.debug("Event batch rejected");
return Status.FAILED;
@@ -276,15 +275,14 @@ public class RpcTestUtils {
public static class OKAvroHandler implements AvroSourceProtocol {
@Override
- public Status append(AvroFlumeEvent event) throws AvroRemoteException {
+ public Status append(AvroFlumeEvent event) {
logger.info("OK: Received event from append(): {}",
new String(event.getBody().array(), Charset.forName("UTF8")));
return Status.OK;
}
@Override
- public Status appendBatch(List<AvroFlumeEvent> events) throws
- AvroRemoteException {
+ public Status appendBatch(List<AvroFlumeEvent> events) {
logger.info("OK: Received {} events from appendBatch()",
events.size());
return Status.OK;
@@ -298,14 +296,14 @@ public class RpcTestUtils {
public static class FailedAvroHandler implements AvroSourceProtocol {
@Override
- public Status append(AvroFlumeEvent event) throws AvroRemoteException {
+ public Status append(AvroFlumeEvent event) {
logger.info("Failed: Received event from append(): {}",
new String(event.getBody().array(),
Charset.forName("UTF8")));
return Status.FAILED;
}
@Override
- public Status appendBatch(List<AvroFlumeEvent> events) throws
AvroRemoteException {
+ public Status appendBatch(List<AvroFlumeEvent> events) {
logger.info("Failed: Received {} events from appendBatch()",
events.size());
return Status.FAILED;
}
@@ -318,14 +316,14 @@ public class RpcTestUtils {
public static class UnknownAvroHandler implements AvroSourceProtocol {
@Override
- public Status append(AvroFlumeEvent event) throws AvroRemoteException {
+ public Status append(AvroFlumeEvent event) {
logger.info("Unknown: Received event from append(): {}",
new String(event.getBody().array(),
Charset.forName("UTF8")));
return Status.UNKNOWN;
}
@Override
- public Status appendBatch(List<AvroFlumeEvent> events) throws
AvroRemoteException {
+ public Status appendBatch(List<AvroFlumeEvent> events) {
logger.info("Unknown: Received {} events from appendBatch()",
events.size());
return Status.UNKNOWN;
@@ -336,19 +334,37 @@ public class RpcTestUtils {
/**
* A service that logs receipt of the request and then throws an exception
*/
- public static class ThrowingAvroHandler implements AvroSourceProtocol {
+ public static class ThrowingAvroHandler
+ implements AvroSourceProtocol.Callback {
@Override
- public Status append(AvroFlumeEvent event) throws AvroRemoteException {
+ public void append(AvroFlumeEvent event,
+ org.apache.avro.ipc.Callback<Status> callback)
+ throws java.io.IOException {
logger.info("Throwing: Received event from append(): {}",
new String(event.getBody().array(),
Charset.forName("UTF8")));
- throw new AvroRemoteException("Handler smash!");
+ throw new java.io.IOException("Handler smash!");
+ }
+
+ @Override
+ public Status append(AvroFlumeEvent event) {
+ logger.info("Throwing unavailable: Received event from append(): {}",
+ new String(event.getBody().array(), Charset.forName("UTF8")));
+ return null;
}
@Override
- public Status appendBatch(List<AvroFlumeEvent> events) throws
AvroRemoteException {
+ public void appendBatch(List<AvroFlumeEvent> events,
+ org.apache.avro.ipc.Callback<Status> callback)
+ throws java.io.IOException {
logger.info("Throwing: Received {} events from appendBatch()",
events.size());
- throw new AvroRemoteException("Handler smash!");
+ throw new java.io.IOException("Handler smash!");
+ }
+
+ @Override
+ public Status appendBatch(List<AvroFlumeEvent> events) {
+ logger.info("Throwing unavailable: Received {} events from
appendBatch()", events.size());
+ return null;
}
}
diff --git a/flume-ng-sinks/flume-dataset-sink/pom.xml
b/flume-ng-sinks/flume-dataset-sink/pom.xml
deleted file mode 100644
index 56fcbd5..0000000
--- a/flume-ng-sinks/flume-dataset-sink/pom.xml
+++ /dev/null
@@ -1,132 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-Licensed to the Apache Software Foundation (ASF) under one or more
-contributor license agreements. See the NOTICE file distributed with
-this work for additional information regarding copyright ownership.
-The ASF licenses this file to You under the Apache License, Version 2.0
-(the "License"); you may not use this file except in compliance with
-the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing, software
-distributed under the License is distributed on an "AS IS" BASIS,
-WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-See the License for the specific language governing permissions and
-limitations under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
-
- <modelVersion>4.0.0</modelVersion>
-
- <parent>
- <artifactId>flume-ng-sinks</artifactId>
- <groupId>org.apache.flume</groupId>
- <version>1.10.0-SNAPSHOT</version>
- </parent>
-
- <groupId>org.apache.flume.flume-ng-sinks</groupId>
- <artifactId>flume-dataset-sink</artifactId>
- <name>Flume NG Kite Dataset Sink</name>
-
- <properties>
- <!-- TODO fix spotbugspmd violations -->
- <spotbugs.maxAllowedViolations>8</spotbugs.maxAllowedViolations>
- <pmd.maxAllowedViolations>11</pmd.maxAllowedViolations>
- </properties>
-
- <dependencies>
-
- <dependency>
- <groupId>org.apache.flume</groupId>
- <artifactId>flume-ng-sdk</artifactId>
- </dependency>
-
- <dependency>
- <groupId>org.apache.flume</groupId>
- <artifactId>flume-ng-configuration</artifactId>
- </dependency>
-
- <dependency>
- <groupId>org.apache.flume</groupId>
- <artifactId>flume-ng-core</artifactId>
- </dependency>
-
- <dependency>
- <groupId>org.kitesdk</groupId>
- <artifactId>kite-data-core</artifactId>
- </dependency>
-
- <dependency>
- <groupId>org.kitesdk</groupId>
- <artifactId>kite-data-hive</artifactId>
- </dependency>
-
- <dependency>
- <groupId>org.kitesdk</groupId>
- <artifactId>kite-data-hbase</artifactId>
- </dependency>
-
- <dependency>
- <groupId>org.apache.avro</groupId>
- <artifactId>avro</artifactId>
- </dependency>
-
- <dependency>
- <groupId>org.apache.hive</groupId>
- <artifactId>hive-metastore</artifactId>
- <optional>true</optional>
- </dependency>
-
- <dependency>
- <!-- build will fail if this is not hadoop-common 2.*
- because kite uses hflush.
- -->
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-common</artifactId>
- <optional>true</optional>
- </dependency>
-
- <dependency>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-api</artifactId>
- </dependency>
-
- <dependency>
- <groupId>com.google.guava</groupId>
- <artifactId>guava</artifactId>
- </dependency>
-
- <dependency>
- <groupId>junit</groupId>
- <artifactId>junit</artifactId>
- <scope>test</scope>
- </dependency>
-
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-minicluster</artifactId>
- </dependency>
-
- <dependency>
- <groupId>org.apache.logging.log4j</groupId>
- <artifactId>log4j-slf4j-impl</artifactId>
- <scope>test</scope>
- </dependency>
-
- <dependency>
- <groupId>org.apache.logging.log4j</groupId>
- <artifactId>log4j-1.2-api</artifactId>
- <scope>test</scope>
- </dependency>
-
- <dependency>
- <groupId>org.mockito</groupId>
- <artifactId>mockito-all</artifactId>
- <scope>test</scope>
- </dependency>
-
- </dependencies>
-
-</project>
diff --git
a/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/DatasetSink.java
b/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/DatasetSink.java
deleted file mode 100644
index 4a44264..0000000
---
a/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/DatasetSink.java
+++ /dev/null
@@ -1,588 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flume.sink.kite;
-
-import org.apache.flume.auth.FlumeAuthenticationUtil;
-import org.apache.flume.auth.PrivilegedExecutor;
-import org.apache.flume.conf.BatchSizeSupported;
-import org.apache.flume.sink.kite.parser.EntityParserFactory;
-import org.apache.flume.sink.kite.parser.EntityParser;
-import org.apache.flume.sink.kite.policy.FailurePolicy;
-import org.apache.flume.sink.kite.policy.FailurePolicyFactory;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Throwables;
-import com.google.common.collect.Lists;
-
-import java.net.URI;
-import java.security.PrivilegedAction;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-import org.apache.avro.Schema;
-import org.apache.avro.file.DataFileWriter;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.flume.Channel;
-import org.apache.flume.Context;
-import org.apache.flume.Event;
-import org.apache.flume.EventDeliveryException;
-import org.apache.flume.Transaction;
-import org.apache.flume.conf.Configurable;
-import org.apache.flume.instrumentation.SinkCounter;
-import org.apache.flume.sink.AbstractSink;
-import org.kitesdk.data.Dataset;
-import org.kitesdk.data.DatasetDescriptor;
-import org.kitesdk.data.DatasetIOException;
-import org.kitesdk.data.DatasetNotFoundException;
-import org.kitesdk.data.DatasetWriter;
-import org.kitesdk.data.Datasets;
-import org.kitesdk.data.Flushable;
-import org.kitesdk.data.Syncable;
-import org.kitesdk.data.View;
-import org.kitesdk.data.spi.Registration;
-import org.kitesdk.data.URIBuilder;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import static org.apache.flume.sink.kite.DatasetSinkConstants.*;
-import org.kitesdk.data.Format;
-import org.kitesdk.data.Formats;
-
-/**
- * Sink that writes events to a Kite Dataset. This sink will parse the body of
- * each incoming event and store the resulting entity in a Kite Dataset. It
- * determines the destination Dataset by opening a dataset URI
- * {@code kite.dataset.uri} or opening a repository URI, {@code kite.repo.uri},
- * and loading a Dataset by name, {@code kite.dataset.name}, and namespace,
- * {@code kite.dataset.namespace}.
- */
-public class DatasetSink extends AbstractSink implements Configurable,
BatchSizeSupported {
-
- private static final Logger LOG = LoggerFactory.getLogger(DatasetSink.class);
-
- private Context context = null;
- private PrivilegedExecutor privilegedExecutor;
-
- private String datasetName = null;
- private URI datasetUri = null;
- private Schema datasetSchema = null;
- private DatasetWriter<GenericRecord> writer = null;
-
- /**
- * The number of events to process as a single batch.
- */
- private long batchSize = DEFAULT_BATCH_SIZE;
-
- /**
- * The number of seconds to wait before rolling a writer.
- */
- private int rollIntervalSeconds = DEFAULT_ROLL_INTERVAL;
-
- /**
- * Flag that says if Flume should commit on every batch.
- */
- private boolean commitOnBatch = DEFAULT_FLUSHABLE_COMMIT_ON_BATCH;
-
- /**
- * Flag that says if Flume should sync on every batch.
- */
- private boolean syncOnBatch = DEFAULT_SYNCABLE_SYNC_ON_BATCH;
-
- /**
- * The last time the writer rolled.
- */
- private long lastRolledMillis = 0L;
-
- /**
- * The raw number of bytes parsed.
- */
- private long bytesParsed = 0L;
-
- /**
- * A class for parsing Kite entities from Flume Events.
- */
- private EntityParser<GenericRecord> parser = null;
-
- /**
- * A class implementing a failure newPolicy for events that had a
- non-recoverable error during processing.
- */
- private FailurePolicy failurePolicy = null;
-
- private SinkCounter counter = null;
-
- /**
- * The Kite entity
- */
- private GenericRecord entity = null;
- // TODO: remove this after PARQUET-62 is released
- private boolean reuseEntity = true;
-
- /**
- * The Flume transaction. Used to keep transactions open across calls to
- * process.
- */
- private Transaction transaction = null;
-
- /**
- * Internal flag on if there has been a batch of records committed. This is
- * used during rollback to know if the current writer needs to be closed.
- */
- private boolean committedBatch = false;
-
- // Factories
- private static final EntityParserFactory ENTITY_PARSER_FACTORY =
- new EntityParserFactory();
- private static final FailurePolicyFactory FAILURE_POLICY_FACTORY =
- new FailurePolicyFactory();
-
- /**
- * Return the list of allowed formats.
- * @return The list of allowed formats.
- */
- protected List<String> allowedFormats() {
- return Lists.newArrayList("avro", "parquet");
- }
-
- @Override
- public void configure(Context context) {
- this.context = context;
-
- String principal = context.getString(AUTH_PRINCIPAL);
- String keytab = context.getString(AUTH_KEYTAB);
- String effectiveUser = context.getString(AUTH_PROXY_USER);
-
- this.privilegedExecutor = FlumeAuthenticationUtil.getAuthenticator(
- principal, keytab).proxyAs(effectiveUser);
-
- // Get the dataset URI and name from the context
- String datasetURI = context.getString(CONFIG_KITE_DATASET_URI);
- if (datasetURI != null) {
- this.datasetUri = URI.create(datasetURI);
- this.datasetName = uriToName(datasetUri);
- } else {
- String repositoryURI = context.getString(CONFIG_KITE_REPO_URI);
- Preconditions.checkNotNull(repositoryURI, "No dataset configured.
Setting "
- + CONFIG_KITE_DATASET_URI + " is required.");
-
- this.datasetName = context.getString(CONFIG_KITE_DATASET_NAME);
- Preconditions.checkNotNull(datasetName, "No dataset configured. Setting "
- + CONFIG_KITE_DATASET_URI + " is required.");
-
- String namespace = context.getString(CONFIG_KITE_DATASET_NAMESPACE,
- DEFAULT_NAMESPACE);
-
- this.datasetUri = new URIBuilder(repositoryURI, namespace, datasetName)
- .build();
- }
- this.setName(datasetUri.toString());
-
- if (context.getBoolean(CONFIG_SYNCABLE_SYNC_ON_BATCH,
- DEFAULT_SYNCABLE_SYNC_ON_BATCH)) {
- Preconditions.checkArgument(
- context.getBoolean(CONFIG_FLUSHABLE_COMMIT_ON_BATCH,
- DEFAULT_FLUSHABLE_COMMIT_ON_BATCH), "Configuration error: "
- + CONFIG_FLUSHABLE_COMMIT_ON_BATCH + " must be set to true
when "
- + CONFIG_SYNCABLE_SYNC_ON_BATCH + " is set to true.");
- }
-
- // Create the configured failure failurePolicy
- this.failurePolicy = FAILURE_POLICY_FACTORY.newPolicy(context);
-
- // other configuration
- this.batchSize = context.getLong(CONFIG_KITE_BATCH_SIZE,
- DEFAULT_BATCH_SIZE);
- this.rollIntervalSeconds = context.getInteger(CONFIG_KITE_ROLL_INTERVAL,
- DEFAULT_ROLL_INTERVAL);
-
- this.counter = new SinkCounter(datasetName);
- }
-
- @Override
- public synchronized void start() {
- this.lastRolledMillis = System.currentTimeMillis();
- counter.start();
- // signal that this sink is ready to process
- LOG.info("Started DatasetSink " + getName());
- super.start();
- }
-
- /**
- * Causes the sink to roll at the next {@link #process()} call.
- */
- @VisibleForTesting
- void roll() {
- this.lastRolledMillis = 0L;
- }
-
- @VisibleForTesting
- DatasetWriter<GenericRecord> getWriter() {
- return writer;
- }
-
- @VisibleForTesting
- void setWriter(DatasetWriter<GenericRecord> writer) {
- this.writer = writer;
- }
-
- @VisibleForTesting
- void setParser(EntityParser<GenericRecord> parser) {
- this.parser = parser;
- }
-
- @VisibleForTesting
- void setFailurePolicy(FailurePolicy failurePolicy) {
- this.failurePolicy = failurePolicy;
- }
-
- @Override
- public synchronized void stop() {
- counter.stop();
-
- try {
- // Close the writer and commit the transaction, but don't create a new
- // writer since we're stopping
- closeWriter();
- commitTransaction();
- } catch (EventDeliveryException ex) {
- rollbackTransaction();
-
- LOG.warn("Closing the writer failed: " + ex.getLocalizedMessage());
- LOG.debug("Exception follows.", ex);
- // We don't propogate the exception as the transaction would have been
- // rolled back and we can still finish stopping
- }
-
- // signal that this sink has stopped
- LOG.info("Stopped dataset sink: " + getName());
- super.stop();
- }
-
- @Override
- public Status process() throws EventDeliveryException {
- long processedEvents = 0;
-
- try {
- if (shouldRoll()) {
- closeWriter();
- commitTransaction();
- createWriter();
- }
-
- // The writer shouldn't be null at this point
- Preconditions.checkNotNull(writer,
- "Can't process events with a null writer. This is likely a bug.");
- Channel channel = getChannel();
-
- // Enter the transaction boundary if we haven't already
- enterTransaction(channel);
-
- for (; processedEvents < batchSize; processedEvents += 1) {
- Event event = channel.take();
-
- if (event == null) {
- // no events available in the channel
- break;
- }
-
- write(event);
- }
-
- // commit transaction
- if (commitOnBatch) {
- // Flush/sync before commiting. A failure here will result in rolling
back
- // the transaction
- if (syncOnBatch && writer instanceof Syncable) {
- ((Syncable) writer).sync();
- } else if (writer instanceof Flushable) {
- ((Flushable) writer).flush();
- }
- boolean committed = commitTransaction();
- Preconditions.checkState(committed,
- "Tried to commit a batch when there was no transaction");
- committedBatch |= committed;
- }
- } catch (Throwable th) {
- // catch-all for any unhandled Throwable so that the transaction is
- // correctly rolled back.
- rollbackTransaction();
-
- if (commitOnBatch && committedBatch) {
- try {
- closeWriter();
- } catch (EventDeliveryException ex) {
- LOG.warn("Error closing writer there may be temp files that need to"
- + " be manually recovered: " + ex.getLocalizedMessage());
- LOG.debug("Exception follows.", ex);
- }
- } else {
- this.writer = null;
- }
-
- // handle the exception
- Throwables.propagateIfInstanceOf(th, Error.class);
- Throwables.propagateIfInstanceOf(th, EventDeliveryException.class);
- throw new EventDeliveryException(th);
- }
-
- if (processedEvents == 0) {
- counter.incrementBatchEmptyCount();
- return Status.BACKOFF;
- } else if (processedEvents < batchSize) {
- counter.incrementBatchUnderflowCount();
- } else {
- counter.incrementBatchCompleteCount();
- }
-
- counter.addToEventDrainSuccessCount(processedEvents);
-
- return Status.READY;
- }
-
- /**
- * Parse the event using the entity parser and write the entity to the
dataset.
- *
- * @param event The event to write
- * @throws EventDeliveryException An error occurred trying to write to the
- dataset that couldn't or shouldn't be
- handled by the failure policy.
- */
- @VisibleForTesting
- void write(Event event) throws EventDeliveryException {
- try {
- this.entity = parser.parse(event, reuseEntity ? entity : null);
- this.bytesParsed += event.getBody().length;
-
- // writeEncoded would be an optimization in some cases, but HBase
- // will not support it and partitioned Datasets need to get partition
- // info from the entity Object. We may be able to avoid the
- // serialization round-trip otherwise.
- writer.write(entity);
- } catch (NonRecoverableEventException ex) {
- failurePolicy.handle(event, ex);
- } catch (DataFileWriter.AppendWriteException ex) {
- failurePolicy.handle(event, ex);
- } catch (RuntimeException ex) {
- Throwables.propagateIfInstanceOf(ex, EventDeliveryException.class);
- throw new EventDeliveryException(ex);
- }
- }
-
- /**
- * Create a new writer.
- *
- * This method also re-loads the dataset so updates to the configuration or
- * a dataset created after Flume starts will be loaded.
- *
- * @throws EventDeliveryException There was an error creating the writer.
- */
- @VisibleForTesting
- void createWriter() throws EventDeliveryException {
- // reset the commited flag whenever a new writer is created
- committedBatch = false;
- try {
- View<GenericRecord> view;
-
- view = privilegedExecutor.execute(
- new PrivilegedAction<Dataset<GenericRecord>>() {
- @Override
- public Dataset<GenericRecord> run() {
- return Datasets.load(datasetUri);
- }
- });
-
- DatasetDescriptor descriptor = view.getDataset().getDescriptor();
- Format format = descriptor.getFormat();
- Preconditions.checkArgument(allowedFormats().contains(format.getName()),
- "Unsupported format: " + format.getName());
-
- Schema newSchema = descriptor.getSchema();
- if (datasetSchema == null || !newSchema.equals(datasetSchema)) {
- this.datasetSchema = descriptor.getSchema();
- // dataset schema has changed, create a new parser
- parser = ENTITY_PARSER_FACTORY.newParser(datasetSchema, context);
- }
-
- this.reuseEntity = !(Formats.PARQUET.equals(format));
-
- // TODO: Check that the format implements Flushable after CDK-863
- // goes in. For now, just check that the Dataset is Avro format
- this.commitOnBatch = context.getBoolean(CONFIG_FLUSHABLE_COMMIT_ON_BATCH,
- DEFAULT_FLUSHABLE_COMMIT_ON_BATCH) && (Formats.AVRO.equals(format));
-
- // TODO: Check that the format implements Syncable after CDK-863
- // goes in. For now, just check that the Dataset is Avro format
- this.syncOnBatch = context.getBoolean(CONFIG_SYNCABLE_SYNC_ON_BATCH,
- DEFAULT_SYNCABLE_SYNC_ON_BATCH) && (Formats.AVRO.equals(format));
-
- this.datasetName = view.getDataset().getName();
-
- this.writer = view.newWriter();
-
- // Reset the last rolled time and the metrics
- this.lastRolledMillis = System.currentTimeMillis();
- this.bytesParsed = 0L;
- } catch (DatasetNotFoundException ex) {
- throw new EventDeliveryException("Dataset " + datasetUri + " not found."
- + " The dataset must be created before Flume can write to it.", ex);
- } catch (RuntimeException ex) {
- throw new EventDeliveryException("Error trying to open a new"
- + " writer for dataset " + datasetUri, ex);
- }
- }
-
- /**
- * Return true if the sink should roll the writer.
- *
- * Currently, this is based on time since the last roll or if the current
- * writer is null.
- *
- * @return True if and only if the sink should roll the writer
- */
- private boolean shouldRoll() {
- long currentTimeMillis = System.currentTimeMillis();
- long elapsedTimeSeconds = TimeUnit.MILLISECONDS.toSeconds(
- currentTimeMillis - lastRolledMillis);
-
- LOG.debug("Current time: {}, lastRolled: {}, diff: {} sec",
- new Object[] {currentTimeMillis, lastRolledMillis,
elapsedTimeSeconds});
-
- return elapsedTimeSeconds >= rollIntervalSeconds || writer == null;
- }
-
- /**
- * Close the current writer.
- *
- * This method always sets the current writer to null even if close fails.
- * If this method throws an Exception, callers *must* rollback any active
- * transaction to ensure that data is replayed.
- *
- * @throws EventDeliveryException
- */
- @VisibleForTesting
- void closeWriter() throws EventDeliveryException {
- if (writer != null) {
- try {
- writer.close();
-
- long elapsedTimeSeconds = TimeUnit.MILLISECONDS.toSeconds(
- System.currentTimeMillis() - lastRolledMillis);
- LOG.info("Closed writer for {} after {} seconds and {} bytes parsed",
- new Object[]{datasetUri, elapsedTimeSeconds, bytesParsed});
- } catch (DatasetIOException ex) {
- throw new EventDeliveryException("Check HDFS permissions/health. IO"
- + " error trying to close the writer for dataset " + datasetUri,
- ex);
- } catch (RuntimeException ex) {
- throw new EventDeliveryException("Error trying to close the writer
for"
- + " dataset " + datasetUri, ex);
- } finally {
- // If we failed to close the writer then we give up on it as we'll
- // end up throwing an EventDeliveryException which will result in
- // a transaction rollback and a replay of any events written during
- // the current transaction. If commitOnBatch is true, you can still
- // end up with orphaned temp files that have data to be recovered.
- this.writer = null;
- failurePolicy.close();
- }
- }
- }
-
- /**
- * Enter the transaction boundary. This will either begin a new transaction
- * if one didn't already exist. If we're already in a transaction boundary,
- * then this method does nothing.
- *
- * @param channel The Sink's channel
- * @throws EventDeliveryException There was an error starting a new batch
- * with the failure policy.
- */
- private void enterTransaction(Channel channel) throws EventDeliveryException
{
- // There's no synchronization around the transaction instance because the
- // Sink API states "the Sink#process() call is guaranteed to only
- // be accessed by a single thread". Technically other methods could be
- // called concurrently, but the implementation of SinkRunner waits
- // for the Thread running process() to end before calling stop()
- if (transaction == null) {
- this.transaction = channel.getTransaction();
- transaction.begin();
- failurePolicy = FAILURE_POLICY_FACTORY.newPolicy(context);
- }
- }
-
- /**
- * Commit and close the transaction.
- *
- * If this method throws an Exception the caller *must* ensure that the
- * transaction is rolled back. Callers can roll back the transaction by
- * calling {@link #rollbackTransaction()}.
- *
- * @return True if there was an open transaction and it was committed, false
- * otherwise.
- * @throws EventDeliveryException There was an error ending the batch with
- * the failure policy.
- */
- @VisibleForTesting
- boolean commitTransaction() throws EventDeliveryException {
- if (transaction != null) {
- failurePolicy.sync();
- transaction.commit();
- transaction.close();
- this.transaction = null;
- return true;
- } else {
- return false;
- }
- }
-
- /**
- * Rollback the transaction. If there is a RuntimeException during rollback,
- * it will be logged but the transaction instance variable will still be
- * nullified.
- */
- private void rollbackTransaction() {
- if (transaction != null) {
- try {
- // If the transaction wasn't committed before we got the exception, we
- // need to rollback.
- transaction.rollback();
- } catch (RuntimeException ex) {
- LOG.error("Transaction rollback failed: " + ex.getLocalizedMessage());
- LOG.debug("Exception follows.", ex);
- } finally {
- transaction.close();
- this.transaction = null;
- }
- }
- }
-
- /**
- * Get the name of the dataset from the URI
- *
- * @param uri The dataset or view URI
- * @return The dataset name
- */
- private static String uriToName(URI uri) {
- return Registration.lookupDatasetUri(URI.create(
- uri.getRawSchemeSpecificPart())).second().get("dataset");
- }
-
- @Override
- public long getBatchSize() {
- return batchSize;
- }
-}
diff --git
a/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/DatasetSinkConstants.java
b/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/DatasetSinkConstants.java
deleted file mode 100644
index af33304..0000000
---
a/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/DatasetSinkConstants.java
+++ /dev/null
@@ -1,132 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flume.sink.kite;
-
-import org.kitesdk.data.URIBuilder;
-
-public class DatasetSinkConstants {
- /**
- * URI of the Kite Dataset
- */
- public static final String CONFIG_KITE_DATASET_URI = "kite.dataset.uri";
-
- /**
- * URI of the Kite DatasetRepository.
- */
- public static final String CONFIG_KITE_REPO_URI = "kite.repo.uri";
-
- /**
- * Name of the Kite Dataset to write into.
- */
- public static final String CONFIG_KITE_DATASET_NAME = "kite.dataset.name";
-
- /**
- * Namespace of the Kite Dataset to write into.
- */
- public static final String CONFIG_KITE_DATASET_NAMESPACE =
- "kite.dataset.namespace";
- public static final String DEFAULT_NAMESPACE = URIBuilder.NAMESPACE_DEFAULT;
-
- /**
- * Number of records to process from the incoming channel per call to
process.
- */
- public static final String CONFIG_KITE_BATCH_SIZE = "kite.batchSize";
- public static long DEFAULT_BATCH_SIZE = 100;
-
- /**
- * Maximum time to wait before finishing files.
- */
- public static final String CONFIG_KITE_ROLL_INTERVAL = "kite.rollInterval";
- public static int DEFAULT_ROLL_INTERVAL = 30; // seconds
-
- /**
- * Flag for committing the Flume transaction on each batch for Flushable
- * datasets. When set to false, Flume will only commit the transaction when
- * roll interval has expired. Setting this to false requires enough space
- * in the channel to handle all events delivered during the roll interval.
- * Defaults to true.
- */
- public static final String CONFIG_FLUSHABLE_COMMIT_ON_BATCH =
- "kite.flushable.commiteOnBatch";
- public static boolean DEFAULT_FLUSHABLE_COMMIT_ON_BATCH = true;
-
- /**
- * Flag for syncing the DatasetWriter on each batch for Syncable
- * datasets. Defaults to true.
- */
- public static final String CONFIG_SYNCABLE_SYNC_ON_BATCH =
- "kite.syncable.syncOnBatch";
- public static boolean DEFAULT_SYNCABLE_SYNC_ON_BATCH = true;
-
- /**
- * Parser used to parse Flume Events into Kite entities.
- */
- public static final String CONFIG_ENTITY_PARSER = "kite.entityParser";
-
- /**
- * Built-in entity parsers
- */
- public static final String AVRO_ENTITY_PARSER = "avro";
- public static final String DEFAULT_ENTITY_PARSER = AVRO_ENTITY_PARSER;
- public static final String[] AVAILABLE_PARSERS = new String[] {
- AVRO_ENTITY_PARSER
- };
-
- /**
- * Policy used to handle non-recoverable failures.
- */
- public static final String CONFIG_FAILURE_POLICY = "kite.failurePolicy";
-
- /**
- * Write non-recoverable Flume events to a Kite dataset.
- */
- public static final String SAVE_FAILURE_POLICY = "save";
-
- /**
- * The URI to write non-recoverable Flume events to in the case of an error.
- * If the dataset doesn't exist, it will be created.
- */
- public static final String CONFIG_KITE_ERROR_DATASET_URI =
- "kite.error.dataset.uri";
-
- /**
- * Retry non-recoverable Flume events. This will lead to a never ending cycle
- * of failure, but matches the previous default semantics of the DatasetSink.
- */
- public static final String RETRY_FAILURE_POLICY = "retry";
- public static final String DEFAULT_FAILURE_POLICY = RETRY_FAILURE_POLICY;
- public static final String[] AVAILABLE_POLICIES = new String[] {
- RETRY_FAILURE_POLICY,
- SAVE_FAILURE_POLICY
- };
-
- /**
- * Headers where avro schema information is expected.
- */
- public static final String AVRO_SCHEMA_LITERAL_HEADER =
- "flume.avro.schema.literal";
- public static final String AVRO_SCHEMA_URL_HEADER = "flume.avro.schema.url";
-
- /**
- * Hadoop authentication settings
- */
- public static final String AUTH_PROXY_USER = "auth.proxyUser";
- public static final String AUTH_PRINCIPAL = "auth.kerberosPrincipal";
- public static final String AUTH_KEYTAB = "auth.kerberosKeytab";
-}
diff --git
a/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/NonRecoverableEventException.java
b/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/NonRecoverableEventException.java
deleted file mode 100644
index 991869a..0000000
---
a/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/NonRecoverableEventException.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flume.sink.kite;
-
-/**
- * A non-recoverable error trying to deliver the event.
- *
- * Non-recoverable event delivery failures include:
- *
- * 1. Error parsing the event body thrown from the {@link EntityParser}
- * 2. A schema mismatch between the schema of an event and the schema of the
- * destination dataset.
- * 3. A missing schema from the Event header when using the
- * {@link AvroEntityParser}.
- */
-public class NonRecoverableEventException extends Exception {
-
- private static final long serialVersionUID = 3485151222482254285L;
-
- public NonRecoverableEventException() {
- super();
- }
-
- public NonRecoverableEventException(String message) {
- super(message);
- }
-
- public NonRecoverableEventException(String message, Throwable t) {
- super(message, t);
- }
-
- public NonRecoverableEventException(Throwable t) {
- super(t);
- }
-
-}
diff --git
a/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/parser/AvroParser.java
b/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/parser/AvroParser.java
deleted file mode 100644
index 7c6a723..0000000
---
a/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/parser/AvroParser.java
+++ /dev/null
@@ -1,208 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flume.sink.kite.parser;
-
-import com.google.common.base.Preconditions;
-import com.google.common.cache.CacheBuilder;
-import com.google.common.cache.CacheLoader;
-import com.google.common.cache.LoadingCache;
-import com.google.common.util.concurrent.UncheckedExecutionException;
-import java.io.IOException;
-import java.io.InputStream;
-import java.net.URI;
-import java.net.URL;
-import java.util.Locale;
-import java.util.Map;
-import java.util.concurrent.ExecutionException;
-import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericDatumReader;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.avro.io.BinaryDecoder;
-import org.apache.avro.io.DatumReader;
-import org.apache.avro.io.DecoderFactory;
-import org.apache.flume.Context;
-import org.apache.flume.Event;
-import org.apache.flume.EventDeliveryException;
-import org.apache.flume.sink.kite.NonRecoverableEventException;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-
-import static org.apache.flume.sink.kite.DatasetSinkConstants.*;
-
-/**
- * An {@link EntityParser} that parses Avro serialized bytes from an event.
- *
- * The Avro schema used to serialize the data should be set as either a URL
- * or literal in the flume.avro.schema.url or flume.avro.schema.literal event
- * headers respectively.
- */
-public class AvroParser implements EntityParser<GenericRecord> {
-
- static Configuration conf = new Configuration();
-
- /**
- * A cache of literal schemas to avoid re-parsing the schema.
- */
- private static final LoadingCache<String, Schema> schemasFromLiteral =
- CacheBuilder.newBuilder()
- .build(new CacheLoader<String, Schema>() {
- @Override
- public Schema load(String literal) {
- Preconditions.checkNotNull(literal,
- "Schema literal cannot be null without a Schema URL");
- return new Schema.Parser().parse(literal);
- }
- });
-
- /**
- * A cache of schemas retrieved by URL to avoid re-parsing the schema.
- */
- private static final LoadingCache<String, Schema> schemasFromURL =
- CacheBuilder.newBuilder()
- .build(new CacheLoader<String, Schema>() {
- @Override
- public Schema load(String url) throws IOException {
- Schema.Parser parser = new Schema.Parser();
- InputStream is = null;
- try {
- FileSystem fs = FileSystem.get(URI.create(url), conf);
- if (url.toLowerCase(Locale.ENGLISH).startsWith("hdfs:/")) {
- is = fs.open(new Path(url));
- } else {
- is = new URL(url).openStream();
- }
- return parser.parse(is);
- } finally {
- if (is != null) {
- is.close();
- }
- }
- }
- });
-
- /**
- * The schema of the destination dataset.
- *
- * Used as the reader schema during parsing.
- */
- private final Schema datasetSchema;
-
- /**
- * A cache of DatumReaders per schema.
- */
- private final LoadingCache<Schema, DatumReader<GenericRecord>> readers =
- CacheBuilder.newBuilder()
- .build(new CacheLoader<Schema, DatumReader<GenericRecord>>() {
- @Override
- public DatumReader<GenericRecord> load(Schema schema) {
- // must use the target dataset's schema for reading to ensure the
- // records are able to be stored using it
- return new GenericDatumReader<GenericRecord>(
- schema, datasetSchema);
- }
- });
-
- /**
- * The binary decoder to reuse for event parsing.
- */
- private BinaryDecoder decoder = null;
-
- /**
- * Create a new AvroParser given the schema of the destination dataset.
- *
- * @param datasetSchema The schema of the destination dataset.
- */
- private AvroParser(Schema datasetSchema) {
- this.datasetSchema = datasetSchema;
- }
-
- /**
- * Parse the entity from the body of the given event.
- *
- * @param event The event to parse.
- * @param reuse If non-null, this may be reused and returned from this
method.
- * @return The parsed entity as a GenericRecord.
- * @throws EventDeliveryException A recoverable error such as an error
- * downloading the schema from the URL has
- * occurred.
- * @throws NonRecoverableEventException A non-recoverable error such as an
- * unparsable schema or entity has
- * occurred.
- */
- @Override
- public GenericRecord parse(Event event, GenericRecord reuse)
- throws EventDeliveryException, NonRecoverableEventException {
- decoder = DecoderFactory.get().binaryDecoder(event.getBody(), decoder);
-
- try {
- DatumReader<GenericRecord> reader = readers.getUnchecked(schema(event));
- return reader.read(reuse, decoder);
- } catch (IOException ex) {
- throw new NonRecoverableEventException("Cannot deserialize event", ex);
- } catch (RuntimeException ex) {
- throw new NonRecoverableEventException("Cannot deserialize event", ex);
- }
- }
-
- /**
- * Get the schema from the event headers.
- *
- * @param event The Flume event
- * @return The schema for the event
- * @throws EventDeliveryException A recoverable error such as an error
- * downloading the schema from the URL has
- * occurred.
- * @throws NonRecoverableEventException A non-recoverable error such as an
- * unparsable schema has occurred.
- */
- private static Schema schema(Event event) throws EventDeliveryException,
- NonRecoverableEventException {
- Map<String, String> headers = event.getHeaders();
- String schemaURL = headers.get(AVRO_SCHEMA_URL_HEADER);
- try {
- if (schemaURL != null) {
- return schemasFromURL.get(schemaURL);
- } else {
- String schemaLiteral = headers.get(AVRO_SCHEMA_LITERAL_HEADER);
- if (schemaLiteral == null) {
- throw new NonRecoverableEventException("No schema in event headers."
- + " Headers must include either " + AVRO_SCHEMA_URL_HEADER
- + " or " + AVRO_SCHEMA_LITERAL_HEADER);
- }
-
- return schemasFromLiteral.get(schemaLiteral);
- }
- } catch (ExecutionException ex) {
- throw new EventDeliveryException("Cannot get schema", ex.getCause());
- } catch (UncheckedExecutionException ex) {
- throw new NonRecoverableEventException("Cannot parse schema",
- ex.getCause());
- }
- }
-
- public static class Builder implements EntityParser.Builder<GenericRecord> {
-
- @Override
- public EntityParser<GenericRecord> build(Schema datasetSchema, Context
config) {
- return new AvroParser(datasetSchema);
- }
-
- }
-}
diff --git
a/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/parser/EntityParser.java
b/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/parser/EntityParser.java
deleted file mode 100644
index f2051a2..0000000
---
a/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/parser/EntityParser.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flume.sink.kite.parser;
-
-import javax.annotation.concurrent.NotThreadSafe;
-import org.apache.avro.Schema;
-import org.apache.flume.Context;
-import org.apache.flume.Event;
-import org.apache.flume.EventDeliveryException;
-import org.apache.flume.sink.kite.NonRecoverableEventException;
-
-@NotThreadSafe
-public interface EntityParser<E> {
-
- /**
- * Parse a Kite entity from a Flume event
- *
- * @param event The event to parse
- * @param reuse If non-null, this may be reused and returned
- * @return The parsed entity
- * @throws EventDeliveryException A recoverable error during parsing. Parsing
- * can be safely retried.
- * @throws NonRecoverableEventException A non-recoverable error during
- * parsing. The event must be discarded.
- *
- */
- public E parse(Event event, E reuse) throws EventDeliveryException,
- NonRecoverableEventException;
-
- /**
- * Knows how to build {@code EntityParser}s. Implementers must provide a
- * no-arg constructor.
- *
- * @param <E> The type of entities generated
- */
- public static interface Builder<E> {
-
- public EntityParser<E> build(Schema datasetSchema, Context config);
- }
-}
diff --git
a/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/parser/EntityParserFactory.java
b/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/parser/EntityParserFactory.java
deleted file mode 100644
index 3720ff3..0000000
---
a/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/parser/EntityParserFactory.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flume.sink.kite.parser;
-
-import java.util.Arrays;
-import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.flume.Context;
-
-import static org.apache.flume.sink.kite.DatasetSinkConstants.*;
-
-public class EntityParserFactory {
-
- public EntityParser<GenericRecord> newParser(Schema datasetSchema, Context
config) {
- EntityParser<GenericRecord> parser;
-
- String parserType = config.getString(CONFIG_ENTITY_PARSER,
- DEFAULT_ENTITY_PARSER);
-
- if (parserType.equals(AVRO_ENTITY_PARSER)) {
- parser = new AvroParser.Builder().build(datasetSchema, config);
- } else {
-
- Class<? extends EntityParser.Builder> builderClass;
- Class c;
- try {
- c = Class.forName(parserType);
- } catch (ClassNotFoundException ex) {
- throw new IllegalArgumentException("EntityParser.Builder class "
- + parserType + " not found. Must set " + CONFIG_ENTITY_PARSER
- + " to a class that implements EntityParser.Builder or to a
builtin"
- + " parser: " + Arrays.toString(AVAILABLE_PARSERS), ex);
- }
-
- if (c != null && EntityParser.Builder.class.isAssignableFrom(c)) {
- builderClass = c;
- } else {
- throw new IllegalArgumentException("Class " + parserType + " does not"
- + " implement EntityParser.Builder. Must set "
- + CONFIG_ENTITY_PARSER + " to a class that extends"
- + " EntityParser.Builder or to a builtin parser: "
- + Arrays.toString(AVAILABLE_PARSERS));
- }
-
- EntityParser.Builder<GenericRecord> builder;
- try {
- builder = builderClass.newInstance();
- } catch (InstantiationException ex) {
- throw new IllegalArgumentException("Can't instantiate class "
- + parserType + ". Must set " + CONFIG_ENTITY_PARSER + " to a class"
- + " that extends EntityParser.Builder or to a builtin parser: "
- + Arrays.toString(AVAILABLE_PARSERS), ex);
- } catch (IllegalAccessException ex) {
- throw new IllegalArgumentException("Can't instantiate class "
- + parserType + ". Must set " + CONFIG_ENTITY_PARSER + " to a class"
- + " that extends EntityParser.Builder or to a builtin parser: "
- + Arrays.toString(AVAILABLE_PARSERS), ex);
- }
-
- parser = builder.build(datasetSchema, config);
- }
-
- return parser;
- }
-}
diff --git
a/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/policy/FailurePolicy.java
b/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/policy/FailurePolicy.java
deleted file mode 100644
index f6f875a..0000000
---
a/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/policy/FailurePolicy.java
+++ /dev/null
@@ -1,105 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flume.sink.kite.policy;
-
-import org.apache.flume.Context;
-import org.apache.flume.Event;
-import org.apache.flume.EventDeliveryException;
-import org.apache.flume.sink.kite.DatasetSink;
-import org.kitesdk.data.Syncable;
-
-/**
- * A policy for dealing with non-recoverable event delivery failures.
- *
- * Non-recoverable event delivery failures include:
- *
- * 1. Error parsing the event body thrown from the {@link EntityParser}
- * 2. A schema mismatch between the schema of an event and the schema of the
- * destination dataset.
- * 3. A missing schema from the Event header when using the
- * {@link AvroEntityParser}.
- *
- * The life cycle of a FailurePolicy mimics the life cycle of the
- * {@link DatasetSink#writer}:
- *
- * 1. When a new writer is created, the policy will be instantiated.
- * 2. As Event failures happen,
- * {@link #handle(org.apache.flume.Event, java.lang.Throwable)} will be
- * called to let the policy handle the failure.
- * 3. If the {@link DatasetSink} is configured to commit on batch, then the
- * {@link #sync()} method will be called when the batch is committed.
- * 4. When the writer is closed, the policy's {@link #close()} method will be
- * called.
- */
-public interface FailurePolicy {
-
- /**
- * Handle a non-recoverable event.
- *
- * @param event The event
- * @param cause The cause of the failure
- * @throws EventDeliveryException The policy failed to handle the event. When
- * this is thrown, the Flume transaction will
- * be rolled back and the event will be
retried
- * along with the rest of the batch.
- */
- public void handle(Event event, Throwable cause)
- throws EventDeliveryException;
-
- /**
- * Ensure any handled events are on stable storage.
- *
- * This allows the policy implementation to sync any data that it may not
- * have fully handled.
- *
- * See {@link Syncable#sync()}.
- *
- * @throws EventDeliveryException The policy failed while syncing data.
- * When this is thrown, the Flume transaction
- * will be rolled back and the batch will be
- * retried.
- */
- public void sync() throws EventDeliveryException;
-
- /**
- * Close this FailurePolicy and release any resources.
- *
- * @throws EventDeliveryException The policy failed while closing resources.
- * When this is thrown, the Flume transaction
- * will be rolled back and the batch will be
- * retried.
- */
- public void close() throws EventDeliveryException;
-
- /**
- * Knows how to build {@code FailurePolicy}s. Implementers must provide a
- * no-arg constructor.
- */
- public static interface Builder {
-
- /**
- * Build a new {@code FailurePolicy}
- *
- * @param config The Flume configuration context
- * @return The {@code FailurePolicy}
- */
- FailurePolicy build(Context config);
- }
-
-}
diff --git
a/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/policy/FailurePolicyFactory.java
b/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/policy/FailurePolicyFactory.java
deleted file mode 100644
index d3b1fe8..0000000
---
a/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/policy/FailurePolicyFactory.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flume.sink.kite.policy;
-
-import java.util.Arrays;
-import org.apache.flume.Context;
-
-import static org.apache.flume.sink.kite.DatasetSinkConstants.*;
-
-public class FailurePolicyFactory {
-
- public FailurePolicy newPolicy(Context config) {
- FailurePolicy policy;
-
- String policyType = config.getString(CONFIG_FAILURE_POLICY,
- DEFAULT_FAILURE_POLICY);
-
- if (policyType.equals(RETRY_FAILURE_POLICY)) {
- policy = new RetryPolicy.Builder().build(config);
- } else if (policyType.equals(SAVE_FAILURE_POLICY)) {
- policy = new SavePolicy.Builder().build(config);
- } else {
-
- Class<? extends FailurePolicy.Builder> builderClass;
- Class c;
- try {
- c = Class.forName(policyType);
- } catch (ClassNotFoundException ex) {
- throw new IllegalArgumentException("FailurePolicy.Builder class "
- + policyType + " not found. Must set " + CONFIG_FAILURE_POLICY
- + " to a class that implements FailurePolicy.Builder or to a
builtin"
- + " policy: " + Arrays.toString(AVAILABLE_POLICIES), ex);
- }
-
- if (c != null && FailurePolicy.Builder.class.isAssignableFrom(c)) {
- builderClass = c;
- } else {
- throw new IllegalArgumentException("Class " + policyType + " does not"
- + " implement FailurePolicy.Builder. Must set "
- + CONFIG_FAILURE_POLICY + " to a class that extends"
- + " FailurePolicy.Builder or to a builtin policy: "
- + Arrays.toString(AVAILABLE_POLICIES));
- }
-
- FailurePolicy.Builder builder;
- try {
- builder = builderClass.newInstance();
- } catch (InstantiationException ex) {
- throw new IllegalArgumentException("Can't instantiate class "
- + policyType + ". Must set " + CONFIG_FAILURE_POLICY + " to a
class"
- + " that extends FailurePolicy.Builder or to a builtin policy: "
- + Arrays.toString(AVAILABLE_POLICIES), ex);
- } catch (IllegalAccessException ex) {
- throw new IllegalArgumentException("Can't instantiate class "
- + policyType + ". Must set " + CONFIG_FAILURE_POLICY + " to a
class"
- + " that extends FailurePolicy.Builder or to a builtin policy: "
- + Arrays.toString(AVAILABLE_POLICIES), ex);
- }
-
- policy = builder.build(config);
- }
-
- return policy;
- }
-}
diff --git
a/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/policy/RetryPolicy.java
b/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/policy/RetryPolicy.java
deleted file mode 100644
index 9a4991c..0000000
---
a/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/policy/RetryPolicy.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flume.sink.kite.policy;
-
-import org.apache.flume.Context;
-import org.apache.flume.Event;
-import org.apache.flume.EventDeliveryException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * A failure policy that logs the error and then forces a retry by throwing
- * {@link EventDeliveryException}.
- */
-public class RetryPolicy implements FailurePolicy {
- private static final Logger LOG = LoggerFactory.getLogger(RetryPolicy.class);
-
- private RetryPolicy() {
- }
-
- @Override
- public void handle(Event event, Throwable cause) throws
EventDeliveryException {
- LOG.error("Event delivery failed: " + cause.getLocalizedMessage());
- LOG.debug("Exception follows.", cause);
-
- throw new EventDeliveryException(cause);
- }
-
- @Override
- public void sync() throws EventDeliveryException {
- // do nothing
- }
-
- @Override
- public void close() throws EventDeliveryException {
- // do nothing
- }
-
- public static class Builder implements FailurePolicy.Builder {
-
- @Override
- public FailurePolicy build(Context config) {
- return new RetryPolicy();
- }
-
- }
-}
diff --git
a/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/policy/SavePolicy.java
b/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/policy/SavePolicy.java
deleted file mode 100644
index bd537ec..0000000
---
a/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/policy/SavePolicy.java
+++ /dev/null
@@ -1,128 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flume.sink.kite.policy;
-
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Maps;
-import java.nio.ByteBuffer;
-import java.util.Map;
-import org.apache.flume.Context;
-import org.apache.flume.Event;
-import org.apache.flume.EventDeliveryException;
-import org.apache.flume.source.avro.AvroFlumeEvent;
-import org.kitesdk.data.DatasetDescriptor;
-import org.kitesdk.data.DatasetWriter;
-import org.kitesdk.data.Datasets;
-import org.kitesdk.data.Formats;
-import org.kitesdk.data.Syncable;
-import org.kitesdk.data.View;
-
-import static org.apache.flume.sink.kite.DatasetSinkConstants.*;
-
-/**
- * A failure policy that writes the raw Flume event to a Kite dataset.
- */
-public class SavePolicy implements FailurePolicy {
-
- private final View<AvroFlumeEvent> dataset;
- private DatasetWriter<AvroFlumeEvent> writer;
- private int nEventsHandled;
-
- private SavePolicy(Context context) {
- String uri = context.getString(CONFIG_KITE_ERROR_DATASET_URI);
- Preconditions.checkArgument(uri != null, "Must set "
- + CONFIG_KITE_ERROR_DATASET_URI + " when " + CONFIG_FAILURE_POLICY
- + "=save");
- if (Datasets.exists(uri)) {
- dataset = Datasets.load(uri, AvroFlumeEvent.class);
- } else {
- DatasetDescriptor descriptor = new DatasetDescriptor.Builder()
- .schema(AvroFlumeEvent.class)
- .build();
- dataset = Datasets.create(uri, descriptor, AvroFlumeEvent.class);
- }
-
- nEventsHandled = 0;
- }
-
- @Override
- public void handle(Event event, Throwable cause) throws
EventDeliveryException {
- try {
- if (writer == null) {
- writer = dataset.newWriter();
- }
-
- final AvroFlumeEvent avroEvent = new AvroFlumeEvent();
- avroEvent.setBody(ByteBuffer.wrap(event.getBody()));
- avroEvent.setHeaders(toCharSeqMap(event.getHeaders()));
-
- writer.write(avroEvent);
- nEventsHandled++;
- } catch (RuntimeException ex) {
- throw new EventDeliveryException(ex);
- }
- }
-
- @Override
- public void sync() throws EventDeliveryException {
- if (nEventsHandled > 0) {
- if (Formats.PARQUET.equals(
- dataset.getDataset().getDescriptor().getFormat())) {
- // We need to close the writer on sync if we're writing to a Parquet
- // dataset
- close();
- } else {
- if (writer instanceof Syncable) {
- ((Syncable) writer).sync();
- }
- }
- }
- }
-
- @Override
- public void close() throws EventDeliveryException {
- if (nEventsHandled > 0) {
- try {
- writer.close();
- } catch (RuntimeException ex) {
- throw new EventDeliveryException(ex);
- } finally {
- writer = null;
- nEventsHandled = 0;
- }
- }
- }
-
- /**
- * Helper function to convert a map of String to a map of CharSequence.
- */
- private static Map<CharSequence, CharSequence> toCharSeqMap(
- Map<String, String> map) {
- return Maps.<CharSequence, CharSequence>newHashMap(map);
- }
-
- public static class Builder implements FailurePolicy.Builder {
-
- @Override
- public FailurePolicy build(Context config) {
- return new SavePolicy(config);
- }
-
- }
-}
diff --git
a/flume-ng-sinks/flume-dataset-sink/src/test/java/org/apache/flume/sink/kite/TestDatasetSink.java
b/flume-ng-sinks/flume-dataset-sink/src/test/java/org/apache/flume/sink/kite/TestDatasetSink.java
deleted file mode 100644
index 3709577..0000000
---
a/flume-ng-sinks/flume-dataset-sink/src/test/java/org/apache/flume/sink/kite/TestDatasetSink.java
+++ /dev/null
@@ -1,1036 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flume.sink.kite;
-
-import com.google.common.base.Function;
-import com.google.common.base.Throwables;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-import org.apache.avro.Schema;
-import org.apache.avro.file.DataFileWriter;
-import org.apache.avro.generic.GenericData;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.avro.generic.GenericRecordBuilder;
-import org.apache.avro.io.Encoder;
-import org.apache.avro.io.EncoderFactory;
-import org.apache.avro.reflect.ReflectDatumWriter;
-import org.apache.avro.util.Utf8;
-import org.apache.commons.io.FileUtils;
-import org.apache.flume.Channel;
-import org.apache.flume.Context;
-import org.apache.flume.Event;
-import org.apache.flume.EventDeliveryException;
-import org.apache.flume.Transaction;
-import org.apache.flume.channel.MemoryChannel;
-import org.apache.flume.conf.Configurables;
-import org.apache.flume.event.SimpleEvent;
-import org.apache.flume.sink.kite.parser.EntityParser;
-import org.apache.flume.sink.kite.policy.FailurePolicy;
-import org.apache.flume.source.avro.AvroFlumeEvent;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.junit.After;
-import org.junit.AfterClass;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import org.kitesdk.data.Dataset;
-import org.kitesdk.data.DatasetDescriptor;
-import org.kitesdk.data.DatasetReader;
-import org.kitesdk.data.DatasetWriter;
-import org.kitesdk.data.Datasets;
-import org.kitesdk.data.PartitionStrategy;
-import org.kitesdk.data.View;
-
-import javax.annotation.Nullable;
-import java.io.ByteArrayOutputStream;
-import java.io.File;
-import java.io.FileWriter;
-import java.io.IOException;
-import java.net.URI;
-import java.nio.ByteBuffer;
-import java.util.Arrays;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.Callable;
-
-import static org.mockito.Mockito.any;
-import static org.mockito.Mockito.doThrow;
-import static org.mockito.Mockito.eq;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.never;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-public class TestDatasetSink {
-
- public static final String FILE_REPO_URI = "repo:file:target/test_repo";
- public static final String DATASET_NAME = "test";
- public static final String FILE_DATASET_URI =
- "dataset:file:target/test_repo/" + DATASET_NAME;
- public static final String ERROR_DATASET_URI =
- "dataset:file:target/test_repo/failed_events";
- public static final File SCHEMA_FILE = new File("target/record-schema.avsc");
- public static final Schema RECORD_SCHEMA = new Schema.Parser().parse(
- "{\"type\":\"record\",\"name\":\"rec\",\"fields\":[" +
- "{\"name\":\"id\",\"type\":\"string\"}," +
- "{\"name\":\"msg\",\"type\":[\"string\",\"null\"]," +
- "\"default\":\"default\"}]}");
- public static final Schema COMPATIBLE_SCHEMA = new Schema.Parser().parse(
- "{\"type\":\"record\",\"name\":\"rec\",\"fields\":[" +
- "{\"name\":\"id\",\"type\":\"string\"}]}");
- public static final Schema INCOMPATIBLE_SCHEMA = new Schema.Parser().parse(
- "{\"type\":\"record\",\"name\":\"user\",\"fields\":[" +
- "{\"name\":\"username\",\"type\":\"string\"}]}");
- public static final Schema UPDATED_SCHEMA = new Schema.Parser().parse(
- "{\"type\":\"record\",\"name\":\"rec\",\"fields\":[" +
- "{\"name\":\"id\",\"type\":\"string\"}," +
- "{\"name\":\"priority\",\"type\":\"int\", \"default\": 0}," +
- "{\"name\":\"msg\",\"type\":[\"string\",\"null\"]," +
- "\"default\":\"default\"}]}");
- public static final DatasetDescriptor DESCRIPTOR = new DatasetDescriptor
- .Builder()
- .schema(RECORD_SCHEMA)
- .build();
-
- Context config = null;
- Channel in = null;
- List<GenericRecord> expected = null;
- private static final String DFS_DIR = "target/test/dfs";
- private static final String TEST_BUILD_DATA_KEY = "test.build.data";
- private static String oldTestBuildDataProp = null;
-
- @BeforeClass
- public static void saveSchema() throws IOException {
- oldTestBuildDataProp = System.getProperty(TEST_BUILD_DATA_KEY);
- System.setProperty(TEST_BUILD_DATA_KEY, DFS_DIR);
- FileWriter schema = new FileWriter(SCHEMA_FILE);
- schema.append(RECORD_SCHEMA.toString());
- schema.close();
- }
-
- @AfterClass
- public static void tearDownClass() {
- FileUtils.deleteQuietly(new File(DFS_DIR));
- if (oldTestBuildDataProp != null) {
- System.setProperty(TEST_BUILD_DATA_KEY, oldTestBuildDataProp);
- }
- }
-
- @Before
- public void setup() throws EventDeliveryException {
- Datasets.delete(FILE_DATASET_URI);
- Datasets.create(FILE_DATASET_URI, DESCRIPTOR);
-
- this.config = new Context();
- config.put("keep-alive", "0");
- this.in = new MemoryChannel();
- Configurables.configure(in, config);
-
- config.put(DatasetSinkConstants.CONFIG_KITE_DATASET_URI, FILE_DATASET_URI);
-
- GenericRecordBuilder builder = new GenericRecordBuilder(RECORD_SCHEMA);
- expected = Lists.<GenericRecord>newArrayList(
- builder.set("id", "1").set("msg", "msg1").build(),
- builder.set("id", "2").set("msg", "msg2").build(),
- builder.set("id", "3").set("msg", "msg3").build());
-
- putToChannel(in, Iterables.transform(expected,
- new Function<GenericRecord, Event>() {
- private int i = 0;
-
- @Override
- public Event apply(@Nullable GenericRecord rec) {
- this.i += 1;
- boolean useURI = (i % 2) == 0;
- return event(rec, RECORD_SCHEMA, SCHEMA_FILE, useURI);
- }
- }));
- }
-
- @After
- public void teardown() {
- Datasets.delete(FILE_DATASET_URI);
- }
-
- @Test
- public void testOldConfig() throws EventDeliveryException {
- config.put(DatasetSinkConstants.CONFIG_KITE_DATASET_URI, null);
- config.put(DatasetSinkConstants.CONFIG_KITE_REPO_URI, FILE_REPO_URI);
- config.put(DatasetSinkConstants.CONFIG_KITE_DATASET_NAME, DATASET_NAME);
-
- DatasetSink sink = sink(in, config);
-
- // run the sink
- sink.start();
- sink.process();
- sink.stop();
-
- Assert.assertEquals(
- Sets.newHashSet(expected),
- read(Datasets.load(FILE_DATASET_URI)));
- Assert.assertEquals("Should have committed", 0, remaining(in));
- }
-
- @Test
- public void testDatasetUriOverridesOldConfig() throws EventDeliveryException
{
- // CONFIG_KITE_DATASET_URI is still set, otherwise this will cause an error
- config.put(DatasetSinkConstants.CONFIG_KITE_REPO_URI, "bad uri");
- config.put(DatasetSinkConstants.CONFIG_KITE_DATASET_NAME, "");
-
- DatasetSink sink = sink(in, config);
-
- // run the sink
- sink.start();
- sink.process();
- sink.stop();
-
- Assert.assertEquals(
- Sets.newHashSet(expected),
- read(Datasets.load(FILE_DATASET_URI)));
- Assert.assertEquals("Should have committed", 0, remaining(in));
- }
-
- @Test
- public void testFileStore()
- throws EventDeliveryException, NonRecoverableEventException,
NonRecoverableEventException {
- DatasetSink sink = sink(in, config);
-
- // run the sink
- sink.start();
- sink.process();
- sink.stop();
-
- Assert.assertEquals(
- Sets.newHashSet(expected),
- read(Datasets.load(FILE_DATASET_URI)));
- Assert.assertEquals("Should have committed", 0, remaining(in));
- }
-
- @Test
- public void testParquetDataset() throws EventDeliveryException {
- Datasets.delete(FILE_DATASET_URI);
- Dataset<GenericRecord> created = Datasets.create(FILE_DATASET_URI,
- new DatasetDescriptor.Builder(DESCRIPTOR)
- .format("parquet")
- .build());
-
- DatasetSink sink = sink(in, config);
-
- // run the sink
- sink.start();
- sink.process();
-
- // the transaction should not commit during the call to process
- assertThrows("Transaction should still be open",
IllegalStateException.class,
- new Callable() {
- @Override
- public Object call() throws EventDeliveryException {
- in.getTransaction().begin();
- return null;
- }
- });
- // The records won't commit until the call to stop()
- Assert.assertEquals("Should not have committed", 0, read(created).size());
-
- sink.stop();
-
- Assert.assertEquals(Sets.newHashSet(expected), read(created));
- Assert.assertEquals("Should have committed", 0, remaining(in));
- }
-
- @Test
- public void testPartitionedData() throws EventDeliveryException {
- URI partitionedUri =
URI.create("dataset:file:target/test_repo/partitioned");
- try {
- Datasets.create(partitionedUri, new DatasetDescriptor.Builder(DESCRIPTOR)
- .partitionStrategy(new PartitionStrategy.Builder()
- .identity("id", 10) // partition by id
- .build())
- .build());
-
- config.put(DatasetSinkConstants.CONFIG_KITE_DATASET_URI,
- partitionedUri.toString());
- DatasetSink sink = sink(in, config);
-
- // run the sink
- sink.start();
- sink.process();
- sink.stop();
-
- Assert.assertEquals(
- Sets.newHashSet(expected),
- read(Datasets.load(partitionedUri)));
- Assert.assertEquals("Should have committed", 0, remaining(in));
- } finally {
- if (Datasets.exists(partitionedUri)) {
- Datasets.delete(partitionedUri);
- }
- }
- }
-
- @Test
- public void testStartBeforeDatasetCreated() throws EventDeliveryException {
- // delete the dataset created by setup
- Datasets.delete(FILE_DATASET_URI);
-
- DatasetSink sink = sink(in, config);
-
- // start the sink
- sink.start();
-
- // run the sink without a target dataset
- try {
- sink.process();
- Assert.fail("Should have thrown an exception: no such dataset");
- } catch (EventDeliveryException e) {
- // expected
- }
-
- // create the target dataset
- Datasets.create(FILE_DATASET_URI, DESCRIPTOR);
-
- // run the sink
- sink.process();
- sink.stop();
-
- Assert.assertEquals(Sets.newHashSet(expected),
read(Datasets.load(FILE_DATASET_URI)));
- Assert.assertEquals("Should have committed", 0, remaining(in));
- }
-
- @Test
- public void testDatasetUpdate() throws EventDeliveryException {
- // add an updated record that is missing the msg field
- GenericRecordBuilder updatedBuilder = new
GenericRecordBuilder(UPDATED_SCHEMA);
- GenericData.Record updatedRecord = updatedBuilder
- .set("id", "0")
- .set("priority", 1)
- .set("msg", "Priority 1 message!")
- .build();
-
- // make a set of the expected records with the new schema
- Set<GenericRecord> expectedAsUpdated = Sets.newHashSet();
- for (GenericRecord record : expected) {
- expectedAsUpdated.add(updatedBuilder
- .clear("priority")
- .set("id", record.get("id"))
- .set("msg", record.get("msg"))
- .build());
- }
- expectedAsUpdated.add(updatedRecord);
-
- DatasetSink sink = sink(in, config);
-
- // run the sink
- sink.start();
- sink.process();
-
- // update the dataset's schema
- DatasetDescriptor updated = new DatasetDescriptor
- .Builder(Datasets.load(FILE_DATASET_URI).getDataset().getDescriptor())
- .schema(UPDATED_SCHEMA)
- .build();
- Datasets.update(FILE_DATASET_URI, updated);
-
- // trigger a roll on the next process call to refresh the writer
- sink.roll();
-
- // add the record to the incoming channel and the expected list
- putToChannel(in, event(updatedRecord, UPDATED_SCHEMA, null, false));
-
- // process events with the updated schema
- sink.process();
- sink.stop();
-
- Assert.assertEquals(expectedAsUpdated,
read(Datasets.load(FILE_DATASET_URI)));
- Assert.assertEquals("Should have committed", 0, remaining(in));
- }
-
- @Test
- public void testMiniClusterStore() throws EventDeliveryException,
IOException {
- // setup a minicluster
- MiniDFSCluster cluster = new MiniDFSCluster
- .Builder(new Configuration())
- .build();
-
- FileSystem dfs = cluster.getFileSystem();
- Configuration conf = dfs.getConf();
-
- URI hdfsUri = URI.create(
- "dataset:" + conf.get("fs.defaultFS") + "/tmp/repo" + DATASET_NAME);
- try {
- // create a repository and dataset in HDFS
- Datasets.create(hdfsUri, DESCRIPTOR);
-
- // update the config to use the HDFS repository
- config.put(DatasetSinkConstants.CONFIG_KITE_DATASET_URI,
hdfsUri.toString());
-
- DatasetSink sink = sink(in, config);
-
- // run the sink
- sink.start();
- sink.process();
- sink.stop();
-
- Assert.assertEquals(
- Sets.newHashSet(expected),
- read(Datasets.load(hdfsUri)));
- Assert.assertEquals("Should have committed", 0, remaining(in));
-
- } finally {
- if (Datasets.exists(hdfsUri)) {
- Datasets.delete(hdfsUri);
- }
- cluster.shutdown();
- }
- }
-
- @Test
- public void testBatchSize() throws EventDeliveryException {
- DatasetSink sink = sink(in, config);
-
- // release one record per process call
- config.put("kite.batchSize", "2");
- Configurables.configure(sink, config);
-
- sink.start();
- sink.process(); // process the first and second
- sink.roll(); // roll at the next process call
- sink.process(); // roll and process the third
- Assert.assertEquals(
- Sets.newHashSet(expected.subList(0, 2)),
- read(Datasets.load(FILE_DATASET_URI)));
- Assert.assertEquals("Should have committed", 0, remaining(in));
- sink.roll(); // roll at the next process call
- sink.process(); // roll, the channel is empty
- Assert.assertEquals(
- Sets.newHashSet(expected),
- read(Datasets.load(FILE_DATASET_URI)));
- sink.stop();
- }
-
- @Test
- public void testTimedFileRolling()
- throws EventDeliveryException, InterruptedException {
- // use a new roll interval
- config.put("kite.rollInterval", "1"); // in seconds
-
- DatasetSink sink = sink(in, config);
-
- Dataset<GenericRecord> records = Datasets.load(FILE_DATASET_URI);
-
- // run the sink
- sink.start();
- sink.process();
-
- Assert.assertEquals("Should have committed", 0, remaining(in));
-
- Thread.sleep(1100); // sleep longer than the roll interval
- sink.process(); // rolling happens in the process method
-
- Assert.assertEquals(Sets.newHashSet(expected), read(records));
-
- // wait until the end to stop because it would close the files
- sink.stop();
- }
-
- @Test
- public void testCompatibleSchemas() throws EventDeliveryException {
- DatasetSink sink = sink(in, config);
-
- // add a compatible record that is missing the msg field
- GenericRecordBuilder compatBuilder = new GenericRecordBuilder(
- COMPATIBLE_SCHEMA);
- GenericData.Record compatibleRecord = compatBuilder.set("id", "0").build();
-
- // add the record to the incoming channel
- putToChannel(in, event(compatibleRecord, COMPATIBLE_SCHEMA, null, false));
-
- // the record will be read using the real schema, so create the expected
- // record using it, but without any data
-
- GenericRecordBuilder builder = new GenericRecordBuilder(RECORD_SCHEMA);
- GenericData.Record expectedRecord = builder.set("id", "0").build();
- expected.add(expectedRecord);
-
- // run the sink
- sink.start();
- sink.process();
- sink.stop();
-
- Assert.assertEquals(
- Sets.newHashSet(expected),
- read(Datasets.load(FILE_DATASET_URI)));
- Assert.assertEquals("Should have committed", 0, remaining(in));
- }
-
- @Test
- public void testIncompatibleSchemas() throws EventDeliveryException {
- final DatasetSink sink = sink(in, config);
-
- GenericRecordBuilder builder = new GenericRecordBuilder(
- INCOMPATIBLE_SCHEMA);
- GenericData.Record rec = builder.set("username", "koala").build();
- putToChannel(in, event(rec, INCOMPATIBLE_SCHEMA, null, false));
-
- // run the sink
- sink.start();
- assertThrows("Should fail", EventDeliveryException.class,
- new Callable() {
- @Override
- public Object call() throws EventDeliveryException {
- sink.process();
- return null;
- }
- });
- sink.stop();
-
- Assert.assertEquals("Should have rolled back",
- expected.size() + 1, remaining(in));
- }
-
- @Test
- public void testMissingSchema() throws EventDeliveryException {
- final DatasetSink sink = sink(in, config);
-
- Event badEvent = new SimpleEvent();
- badEvent.setHeaders(Maps.<String, String>newHashMap());
- badEvent.setBody(serialize(expected.get(0), RECORD_SCHEMA));
- putToChannel(in, badEvent);
-
- // run the sink
- sink.start();
- assertThrows("Should fail", EventDeliveryException.class,
- new Callable() {
- @Override
- public Object call() throws EventDeliveryException {
- sink.process();
- return null;
- }
- });
- sink.stop();
-
- Assert.assertEquals("Should have rolled back",
- expected.size() + 1, remaining(in));
- }
-
- @Test
- public void testFileStoreWithSavePolicy() throws EventDeliveryException {
- if (Datasets.exists(ERROR_DATASET_URI)) {
- Datasets.delete(ERROR_DATASET_URI);
- }
- config.put(DatasetSinkConstants.CONFIG_FAILURE_POLICY,
- DatasetSinkConstants.SAVE_FAILURE_POLICY);
- config.put(DatasetSinkConstants.CONFIG_KITE_ERROR_DATASET_URI,
- ERROR_DATASET_URI);
- DatasetSink sink = sink(in, config);
-
- // run the sink
- sink.start();
- sink.process();
- sink.stop();
-
- Assert.assertEquals(
- Sets.newHashSet(expected),
- read(Datasets.load(FILE_DATASET_URI)));
- Assert.assertEquals("Should have committed", 0, remaining(in));
- }
-
- @Test
- public void testMissingSchemaWithSavePolicy() throws EventDeliveryException {
- if (Datasets.exists(ERROR_DATASET_URI)) {
- Datasets.delete(ERROR_DATASET_URI);
- }
- config.put(DatasetSinkConstants.CONFIG_FAILURE_POLICY,
- DatasetSinkConstants.SAVE_FAILURE_POLICY);
- config.put(DatasetSinkConstants.CONFIG_KITE_ERROR_DATASET_URI,
- ERROR_DATASET_URI);
- final DatasetSink sink = sink(in, config);
-
- Event badEvent = new SimpleEvent();
- badEvent.setHeaders(Maps.<String, String>newHashMap());
- badEvent.setBody(serialize(expected.get(0), RECORD_SCHEMA));
- putToChannel(in, badEvent);
-
- // run the sink
- sink.start();
- sink.process();
- sink.stop();
-
- Assert.assertEquals("Good records should have been written",
- Sets.newHashSet(expected),
- read(Datasets.load(FILE_DATASET_URI)));
- Assert.assertEquals("Should not have rolled back", 0, remaining(in));
- Assert.assertEquals("Should have saved the bad event",
- Sets.newHashSet(AvroFlumeEvent.newBuilder()
- .setBody(ByteBuffer.wrap(badEvent.getBody()))
- .setHeaders(toUtf8Map(badEvent.getHeaders()))
- .build()),
- read(Datasets.load(ERROR_DATASET_URI, AvroFlumeEvent.class)));
- }
-
- @Test
- public void testSerializedWithIncompatibleSchemasWithSavePolicy()
- throws EventDeliveryException {
- if (Datasets.exists(ERROR_DATASET_URI)) {
- Datasets.delete(ERROR_DATASET_URI);
- }
- config.put(DatasetSinkConstants.CONFIG_FAILURE_POLICY,
- DatasetSinkConstants.SAVE_FAILURE_POLICY);
- config.put(DatasetSinkConstants.CONFIG_KITE_ERROR_DATASET_URI,
- ERROR_DATASET_URI);
- final DatasetSink sink = sink(in, config);
-
- GenericRecordBuilder builder = new GenericRecordBuilder(
- INCOMPATIBLE_SCHEMA);
- GenericData.Record rec = builder.set("username", "koala").build();
-
- // We pass in a valid schema in the header, but an incompatible schema
- // was used to serialize the record
- Event badEvent = event(rec, INCOMPATIBLE_SCHEMA, SCHEMA_FILE, true);
- putToChannel(in, badEvent);
-
- // run the sink
- sink.start();
- sink.process();
- sink.stop();
-
- Assert.assertEquals("Good records should have been written",
- Sets.newHashSet(expected),
- read(Datasets.load(FILE_DATASET_URI)));
- Assert.assertEquals("Should not have rolled back", 0, remaining(in));
- Assert.assertEquals("Should have saved the bad event",
- Sets.newHashSet(AvroFlumeEvent.newBuilder()
- .setBody(ByteBuffer.wrap(badEvent.getBody()))
- .setHeaders(toUtf8Map(badEvent.getHeaders()))
- .build()),
- read(Datasets.load(ERROR_DATASET_URI, AvroFlumeEvent.class)));
- }
-
- @Test
- public void testSerializedWithIncompatibleSchemas() throws
EventDeliveryException {
- final DatasetSink sink = sink(in, config);
-
- GenericRecordBuilder builder = new GenericRecordBuilder(
- INCOMPATIBLE_SCHEMA);
- GenericData.Record rec = builder.set("username", "koala").build();
-
- // We pass in a valid schema in the header, but an incompatible schema
- // was used to serialize the record
- putToChannel(in, event(rec, INCOMPATIBLE_SCHEMA, SCHEMA_FILE, true));
-
- // run the sink
- sink.start();
- assertThrows("Should fail", EventDeliveryException.class,
- new Callable() {
- @Override
- public Object call() throws EventDeliveryException {
- sink.process();
- return null;
- }
- });
- sink.stop();
-
- Assert.assertEquals("Should have rolled back",
- expected.size() + 1, remaining(in));
- }
-
- @Test
- public void testCommitOnBatch() throws EventDeliveryException {
- DatasetSink sink = sink(in, config);
-
- // run the sink
- sink.start();
- sink.process();
-
- // the transaction should commit during the call to process
- Assert.assertEquals("Should have committed", 0, remaining(in));
- // but the data won't be visible yet
- Assert.assertEquals(0,
- read(Datasets.load(FILE_DATASET_URI)).size());
-
- sink.stop();
-
- Assert.assertEquals(
- Sets.newHashSet(expected),
- read(Datasets.load(FILE_DATASET_URI)));
- }
-
- @Test
- public void testCommitOnBatchFalse() throws EventDeliveryException {
- config.put(DatasetSinkConstants.CONFIG_FLUSHABLE_COMMIT_ON_BATCH,
- Boolean.toString(false));
- config.put(DatasetSinkConstants.CONFIG_SYNCABLE_SYNC_ON_BATCH,
- Boolean.toString(false));
- DatasetSink sink = sink(in, config);
-
- // run the sink
- sink.start();
- sink.process();
-
- // the transaction should not commit during the call to process
- assertThrows("Transaction should still be open",
IllegalStateException.class,
- new Callable() {
- @Override
- public Object call() throws EventDeliveryException {
- in.getTransaction().begin();
- return null;
- }
- });
-
- // the data won't be visible
- Assert.assertEquals(0,
- read(Datasets.load(FILE_DATASET_URI)).size());
-
- sink.stop();
-
- Assert.assertEquals(
- Sets.newHashSet(expected),
- read(Datasets.load(FILE_DATASET_URI)));
- // the transaction should commit during the call to stop
- Assert.assertEquals("Should have committed", 0, remaining(in));
- }
-
- @Test
- public void testCommitOnBatchFalseSyncOnBatchTrue() throws
EventDeliveryException {
- config.put(DatasetSinkConstants.CONFIG_FLUSHABLE_COMMIT_ON_BATCH,
- Boolean.toString(false));
- config.put(DatasetSinkConstants.CONFIG_SYNCABLE_SYNC_ON_BATCH,
- Boolean.toString(true));
-
- try {
- sink(in, config);
- Assert.fail("Should have thrown IllegalArgumentException");
- } catch (IllegalArgumentException ex) {
- // expected
- }
- }
-
- @Test
- public void testCloseAndCreateWriter() throws EventDeliveryException {
- config.put(DatasetSinkConstants.CONFIG_FLUSHABLE_COMMIT_ON_BATCH,
- Boolean.toString(false));
- config.put(DatasetSinkConstants.CONFIG_SYNCABLE_SYNC_ON_BATCH,
- Boolean.toString(false));
- DatasetSink sink = sink(in, config);
-
- // run the sink
- sink.start();
- sink.process();
-
- sink.closeWriter();
- sink.commitTransaction();
- sink.createWriter();
-
- Assert.assertNotNull("Writer should not be null", sink.getWriter());
- Assert.assertEquals("Should have committed", 0, remaining(in));
-
- sink.stop();
-
- Assert.assertEquals(
- Sets.newHashSet(expected),
- read(Datasets.load(FILE_DATASET_URI)));
- }
-
- @Test
- public void testCloseWriter() throws EventDeliveryException {
- config.put(DatasetSinkConstants.CONFIG_FLUSHABLE_COMMIT_ON_BATCH,
- Boolean.toString(false));
- config.put(DatasetSinkConstants.CONFIG_SYNCABLE_SYNC_ON_BATCH,
- Boolean.toString(false));
- DatasetSink sink = sink(in, config);
-
- // run the sink
- sink.start();
- sink.process();
-
- sink.closeWriter();
- sink.commitTransaction();
-
- Assert.assertNull("Writer should be null", sink.getWriter());
- Assert.assertEquals("Should have committed", 0, remaining(in));
-
- sink.stop();
-
- Assert.assertEquals(
- Sets.newHashSet(expected),
- read(Datasets.load(FILE_DATASET_URI)));
- }
-
- @Test
- public void testCreateWriter() throws EventDeliveryException {
- config.put(DatasetSinkConstants.CONFIG_FLUSHABLE_COMMIT_ON_BATCH,
- Boolean.toString(false));
- config.put(DatasetSinkConstants.CONFIG_SYNCABLE_SYNC_ON_BATCH,
- Boolean.toString(false));
- DatasetSink sink = sink(in, config);
-
- // run the sink
- sink.start();
- sink.process();
-
- sink.commitTransaction();
- sink.createWriter();
- Assert.assertNotNull("Writer should not be null", sink.getWriter());
- Assert.assertEquals("Should have committed", 0, remaining(in));
-
- sink.stop();
-
- Assert.assertEquals(0, read(Datasets.load(FILE_DATASET_URI)).size());
- }
-
- @Test
- public void testAppendWriteExceptionInvokesPolicy()
- throws EventDeliveryException, NonRecoverableEventException {
- DatasetSink sink = sink(in, config);
-
- // run the sink
- sink.start();
- sink.process();
-
- // Mock an Event
- Event mockEvent = mock(Event.class);
- when(mockEvent.getBody()).thenReturn(new byte[] { 0x01 });
-
- // Mock a GenericRecord
- GenericRecord mockRecord = mock(GenericRecord.class);
-
- // Mock an EntityParser
- EntityParser<GenericRecord> mockParser = mock(EntityParser.class);
- when(mockParser.parse(eq(mockEvent), any(GenericRecord.class)))
- .thenReturn(mockRecord);
- sink.setParser(mockParser);
-
- // Mock a FailurePolicy
- FailurePolicy mockFailurePolicy = mock(FailurePolicy.class);
- sink.setFailurePolicy(mockFailurePolicy);
-
- // Mock a DatasetWriter
- DatasetWriter<GenericRecord> mockWriter = mock(DatasetWriter.class);
- doThrow(new DataFileWriter.AppendWriteException(new IOException()))
- .when(mockWriter).write(mockRecord);
-
- sink.setWriter(mockWriter);
- sink.write(mockEvent);
-
- // Verify that the event was sent to the failure policy
- verify(mockFailurePolicy).handle(eq(mockEvent), any(Throwable.class));
-
- sink.stop();
- }
-
- @Test
- public void testRuntimeExceptionThrowsEventDeliveryException()
- throws EventDeliveryException, NonRecoverableEventException {
- DatasetSink sink = sink(in, config);
-
- // run the sink
- sink.start();
- sink.process();
-
- // Mock an Event
- Event mockEvent = mock(Event.class);
- when(mockEvent.getBody()).thenReturn(new byte[] { 0x01 });
-
- // Mock a GenericRecord
- GenericRecord mockRecord = mock(GenericRecord.class);
-
- // Mock an EntityParser
- EntityParser<GenericRecord> mockParser = mock(EntityParser.class);
- when(mockParser.parse(eq(mockEvent), any(GenericRecord.class)))
- .thenReturn(mockRecord);
- sink.setParser(mockParser);
-
- // Mock a FailurePolicy
- FailurePolicy mockFailurePolicy = mock(FailurePolicy.class);
- sink.setFailurePolicy(mockFailurePolicy);
-
- // Mock a DatasetWriter
- DatasetWriter<GenericRecord> mockWriter = mock(DatasetWriter.class);
- doThrow(new RuntimeException()).when(mockWriter).write(mockRecord);
-
- sink.setWriter(mockWriter);
-
- try {
- sink.write(mockEvent);
- Assert.fail("Should throw EventDeliveryException");
- } catch (EventDeliveryException ex) {
-
- }
-
- // Verify that the event was not sent to the failure policy
- verify(mockFailurePolicy, never()).handle(eq(mockEvent),
any(Throwable.class));
-
- sink.stop();
- }
-
- @Test
- public void testProcessHandlesNullWriter() throws EventDeliveryException,
- NonRecoverableEventException, NonRecoverableEventException {
- DatasetSink sink = sink(in, config);
-
- // run the sink
- sink.start();
- sink.process();
-
- // explicitly set the writer to null
- sink.setWriter(null);
-
- // this should not throw an NPE
- sink.process();
-
- sink.stop();
-
- Assert.assertEquals("Should have committed", 0, remaining(in));
- }
-
- public static DatasetSink sink(Channel in, Context config) {
- DatasetSink sink = new DatasetSink();
- sink.setChannel(in);
- Configurables.configure(sink, config);
- return sink;
- }
-
- public static <T> HashSet<T> read(View<T> view) {
- DatasetReader<T> reader = null;
- try {
- reader = view.newReader();
- return Sets.newHashSet(reader.iterator());
- } finally {
- if (reader != null) {
- reader.close();
- }
- }
- }
-
- public static int remaining(Channel ch) throws EventDeliveryException {
- Transaction t = ch.getTransaction();
- try {
- t.begin();
- int count = 0;
- while (ch.take() != null) {
- count += 1;
- }
- t.commit();
- return count;
- } catch (Throwable th) {
- t.rollback();
- Throwables.propagateIfInstanceOf(th, Error.class);
- Throwables.propagateIfInstanceOf(th, EventDeliveryException.class);
- throw new EventDeliveryException(th);
- } finally {
- t.close();
- }
- }
-
- public static void putToChannel(Channel in, Event... records)
- throws EventDeliveryException {
- putToChannel(in, Arrays.asList(records));
- }
-
- public static void putToChannel(Channel in, Iterable<Event> records)
- throws EventDeliveryException {
- Transaction t = in.getTransaction();
- try {
- t.begin();
- for (Event record : records) {
- in.put(record);
- }
- t.commit();
- } catch (Throwable th) {
- t.rollback();
- Throwables.propagateIfInstanceOf(th, Error.class);
- Throwables.propagateIfInstanceOf(th, EventDeliveryException.class);
- throw new EventDeliveryException(th);
- } finally {
- t.close();
- }
- }
-
- public static Event event(
- Object datum, Schema schema, File file, boolean useURI) {
- Map<String, String> headers = Maps.newHashMap();
- if (useURI) {
- headers.put(DatasetSinkConstants.AVRO_SCHEMA_URL_HEADER,
- file.getAbsoluteFile().toURI().toString());
- } else {
- headers.put(DatasetSinkConstants.AVRO_SCHEMA_LITERAL_HEADER,
- schema.toString());
- }
- Event e = new SimpleEvent();
- e.setBody(serialize(datum, schema));
- e.setHeaders(headers);
- return e;
- }
-
- @SuppressWarnings("unchecked")
- public static byte[] serialize(Object datum, Schema schema) {
- ByteArrayOutputStream out = new ByteArrayOutputStream();
- Encoder encoder = EncoderFactory.get().binaryEncoder(out, null);
- ReflectDatumWriter writer = new ReflectDatumWriter(schema);
- try {
- writer.write(datum, encoder);
- encoder.flush();
- } catch (IOException ex) {
- Throwables.propagate(ex);
- }
- return out.toByteArray();
- }
-
- /**
- * A convenience method to avoid a large number of @Test(expected=...) tests.
- *
- * This variant uses a Callable, which is allowed to throw checked
Exceptions.
- *
- * @param message A String message to describe this assertion
- * @param expected An Exception class that the Runnable should throw
- * @param callable A Callable that is expected to throw the exception
- */
- public static void assertThrows(
- String message, Class<? extends Exception> expected, Callable callable) {
- try {
- callable.call();
- Assert.fail("No exception was thrown (" + message + "), expected: " +
- expected.getName());
- } catch (Exception actual) {
- Assert.assertEquals(message, expected, actual.getClass());
- }
- }
-
- /**
- * Helper function to convert a map of String to a map of Utf8.
- *
- * @param map A Map of String to String
- * @return The same mappings converting the {@code String}s to {@link Utf8}s
- */
- public static Map<CharSequence, CharSequence> toUtf8Map(
- Map<String, String> map) {
- Map<CharSequence, CharSequence> utf8Map = Maps.newHashMap();
- for (Map.Entry<String, String> entry : map.entrySet()) {
- utf8Map.put(new Utf8(entry.getKey()), new Utf8(entry.getValue()));
- }
- return utf8Map;
- }
-}
diff --git
a/flume-ng-sinks/flume-dataset-sink/src/test/resources/enable-kerberos.xml
b/flume-ng-sinks/flume-dataset-sink/src/test/resources/enable-kerberos.xml
deleted file mode 100644
index 85b0447..0000000
--- a/flume-ng-sinks/flume-dataset-sink/src/test/resources/enable-kerberos.xml
+++ /dev/null
@@ -1,30 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-Copyright 2014 Apache Software Foundation.
-
-Licensed under the Apache License, Version 2.0 (the "License");
-you may not use this file except in compliance with the License.
-You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing, software
-distributed under the License is distributed on an "AS IS" BASIS,
-WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-See the License for the specific language governing permissions and
-limitations under the License.
--->
-
-<configuration>
-
- <property>
- <name>hadoop.security.authentication</name>
- <value>kerberos</value>
- </property>
-
- <property>
- <name>hadoop.security.authorization</name>
- <value>true</value>
- </property>
-
-</configuration>
diff --git a/flume-ng-sinks/flume-ng-hbase-sink/pom.xml
b/flume-ng-sinks/flume-ng-hbase-sink/pom.xml
index 5f66e76..8e8ea83 100644
--- a/flume-ng-sinks/flume-ng-hbase-sink/pom.xml
+++ b/flume-ng-sinks/flume-ng-hbase-sink/pom.xml
@@ -60,11 +60,6 @@
</dependency>
<dependency>
- <groupId>io.netty</groupId>
- <artifactId>netty</artifactId>
- </dependency>
-
- <dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
<scope>test</scope>
diff --git a/flume-ng-sinks/flume-ng-hbase2-sink/pom.xml
b/flume-ng-sinks/flume-ng-hbase2-sink/pom.xml
index ea6a93c..0a445a6 100644
--- a/flume-ng-sinks/flume-ng-hbase2-sink/pom.xml
+++ b/flume-ng-sinks/flume-ng-hbase2-sink/pom.xml
@@ -156,11 +156,6 @@
</dependency>
<dependency>
- <groupId>io.netty</groupId>
- <artifactId>netty</artifactId>
- </dependency>
-
- <dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
<scope>test</scope>
diff --git
a/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestKafkaSink.java
b/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestKafkaSink.java
index cb8a038..97dc0bd 100644
---
a/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestKafkaSink.java
+++
b/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestKafkaSink.java
@@ -55,6 +55,7 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
@@ -183,8 +184,15 @@ public class TestKafkaSink {
ConsumerRecords recs = pollConsumerRecords(topic);
assertNotNull(recs);
assertTrue(recs.count() > 0);
- ConsumerRecord consumerRecord = (ConsumerRecord)
recs.records(topic).iterator().next();
- assertEquals(msg, consumerRecord.value());
+ Iterator<ConsumerRecord> iter = recs.records(topic).iterator();
+ boolean match = false;
+ while (iter.hasNext()) {
+ if (msg.equals(iter.next().value())) {
+ match = true;
+ break;
+ }
+ }
+ assertTrue("No message matches " + msg, match);
}
@Test
diff --git
a/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/util/TestUtil.java
b/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/util/TestUtil.java
index cdf9bad..1a87dc5 100644
---
a/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/util/TestUtil.java
+++
b/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/util/TestUtil.java
@@ -19,7 +19,7 @@
package org.apache.flume.sink.kafka.util;
import org.apache.kafka.clients.admin.AdminClient;
-import org.apache.kafka.clients.admin.DescribeTopicsResult;
+import org.apache.kafka.clients.admin.CreateTopicsResult;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
@@ -165,14 +165,19 @@ public class TestUtil {
NewTopic newTopic = new NewTopic(topicName, numPartitions, (short) 1);
newTopics.add(newTopic);
}
- getAdminClient().createTopics(newTopics);
-
- //the following lines are a bit of black magic to ensure the topic is
ready when we return
- DescribeTopicsResult dtr = getAdminClient().describeTopics(topicNames);
- try {
- dtr.all().get(10, TimeUnit.SECONDS);
- } catch (Exception e) {
- throw new RuntimeException("Error getting topic info", e);
+ CreateTopicsResult result = getAdminClient().createTopics(newTopics);
+ Throwable throwable = null;
+ for (int i = 0; i < 10; ++i) {
+ try {
+ result.all().get(1, TimeUnit.SECONDS);
+ throwable = null;
+ break;
+ } catch (Exception e) {
+ throwable = e;
+ }
+ }
+ if (throwable != null) {
+ throw new RuntimeException("Error getting topic info", throwable);
}
}
public void deleteTopic(String topicName) {
diff --git a/flume-ng-sinks/pom.xml b/flume-ng-sinks/pom.xml
index ae9e306..2ecc140 100644
--- a/flume-ng-sinks/pom.xml
+++ b/flume-ng-sinks/pom.xml
@@ -43,7 +43,6 @@ limitations under the License.
<module>flume-ng-morphline-solr-sink</module>
<module>flume-ng-kafka-sink</module>
<module>flume-http-sink</module>
- <module>flume-dataset-sink</module>
<module>flume-hive-sink</module>
</modules>
diff --git
a/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/KafkaSourceEmbeddedKafka.java
b/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/KafkaSourceEmbeddedKafka.java
index 56a582a..0799664 100644
---
a/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/KafkaSourceEmbeddedKafka.java
+++
b/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/KafkaSourceEmbeddedKafka.java
@@ -20,7 +20,7 @@ import kafka.server.KafkaConfig;
import kafka.server.KafkaServerStartable;
import org.apache.commons.io.FileUtils;
import org.apache.kafka.clients.admin.AdminClient;
-import org.apache.kafka.clients.admin.DescribeTopicsResult;
+import org.apache.kafka.clients.admin.CreateTopicsResult;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.KafkaProducer;
@@ -141,14 +141,19 @@ public class KafkaSourceEmbeddedKafka {
public void createTopic(String topicName, int numPartitions) {
AdminClient adminClient = getAdminClient();
NewTopic newTopic = new NewTopic(topicName, numPartitions, (short) 1);
- adminClient.createTopics(Collections.singletonList(newTopic));
-
- //the following lines are a bit of black magic to ensure the topic is
ready when we return
- DescribeTopicsResult dtr =
adminClient.describeTopics(Collections.singletonList(topicName));
- try {
- dtr.all().get(10, TimeUnit.SECONDS);
- } catch (Exception e) {
- throw new RuntimeException("Error getting topic info", e);
+ CreateTopicsResult result =
adminClient.createTopics(Collections.singletonList(newTopic));
+ Throwable throwable = null;
+ for (int i = 0; i < 10; ++i) {
+ try {
+ result.all().get(1, TimeUnit.SECONDS);
+ throwable = null;
+ break;
+ } catch (Exception e) {
+ throwable = e;
+ }
+ }
+ if (throwable != null) {
+ throw new RuntimeException("Error getting topic info", throwable);
}
}
diff --git a/pom.xml b/pom.xml
index 091d121..fa228bd 100644
--- a/pom.xml
+++ b/pom.xml
@@ -48,7 +48,7 @@ limitations under the License.
<activemq.version>5.7.0</activemq.version>
<asynchbase.version>1.7.0</asynchbase.version>
- <avro.version>1.7.7</avro.version>
+ <avro.version>1.9.2</avro.version>
<bundle-plugin.version>2.3.7</bundle-plugin.version>
<checkstyle.tool.version>8.45.1</checkstyle.tool.version>
<codehaus.jackson.version>1.9.13</codehaus.jackson.version>
@@ -506,7 +506,7 @@ limitations under the License.
<configuration>
<reuseForks>false</reuseForks>
<forkCount>1</forkCount>
- <rerunFailingTestsCount>10</rerunFailingTestsCount>
+ <!--<rerunFailingTestsCount>10</rerunFailingTestsCount>-->
<forkedProcessTimeoutInSeconds>900</forkedProcessTimeoutInSeconds>
<redirectTestOutputToFile>true</redirectTestOutputToFile>
<argLine>-Djava.net.preferIPv4Stack=true</argLine>
@@ -827,14 +827,10 @@ limitations under the License.
<dependency>
<groupId>org.apache.avro</groupId>
- <artifactId>avro-ipc</artifactId>
+ <artifactId>avro-ipc-netty</artifactId>
<version>${avro.version}</version>
<exclusions>
<exclusion>
- <groupId>org.mortbay.jetty</groupId>
- <artifactId>servlet-api</artifactId>
- </exclusion>
- <exclusion>
<groupId>io.netty</groupId>
<artifactId>netty</artifactId>
</exclusion>
@@ -842,6 +838,18 @@ limitations under the License.
</dependency>
<dependency>
+ <groupId>org.apache.avro</groupId>
+ <artifactId>avro-ipc-jetty</artifactId>
+ <version>${avro.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.mortbay.jetty</groupId>
+ <artifactId>servlet-api</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <dependency>
<groupId>org.apache.thrift</groupId>
<artifactId>libthrift</artifactId>
<version>${thrift.version}</version>
@@ -1363,53 +1371,6 @@ limitations under the License.
<artifactId>scala-library</artifactId>
<version>${scala-library.version}</version>
</dependency>
- <dependency>
- <groupId>org.kitesdk</groupId>
- <artifactId>kite-data-core</artifactId>
- <version>${kite.version}</version>
- </dependency>
- <dependency>
- <groupId>org.kitesdk</groupId>
- <artifactId>kite-data-hive</artifactId>
- <version>${kite.version}</version>
- </dependency>
- <dependency>
- <groupId>org.kitesdk</groupId>
- <artifactId>kite-data-hbase</artifactId>
- <version>${kite.version}</version>
- </dependency>
-
- <!-- Dependency for kite-data-hive -->
- <dependency>
- <groupId>org.apache.hive</groupId>
- <artifactId>hive-exec</artifactId>
- <version>${hive.version}</version>
- <exclusions>
- <exclusion>
- <groupId>log4j</groupId>
- <artifactId>log4j</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-log4j12</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
- <dependency>
- <groupId>org.apache.hive</groupId>
- <artifactId>hive-metastore</artifactId>
- <version>${hive.version}</version>
- <exclusions>
- <exclusion>
- <groupId>log4j</groupId>
- <artifactId>log4j</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-log4j12</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
<dependency>
<groupId>org.xerial.snappy</groupId>