Repository: flume Updated Branches: refs/heads/flume-1.6 a52545871 -> 4e840400f
FLUME-2633: Update Kite dependency to 1.0.0 (Tom White via Jarek Jarcec Cecho) Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/4e840400 Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/4e840400 Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/4e840400 Branch: refs/heads/flume-1.6 Commit: 4e840400fa3854d1cfdf70fc4f9f850e20675d8b Parents: a525458 Author: Jarek Jarcec Cecho <[email protected]> Authored: Wed Feb 25 09:51:59 2015 -0800 Committer: Jarek Jarcec Cecho <[email protected]> Committed: Wed Feb 25 09:52:43 2015 -0800 ---------------------------------------------------------------------- flume-ng-sinks/flume-dataset-sink/pom.xml | 2 +- .../java/org/apache/flume/sink/kite/DatasetSink.java | 13 ++++++------- .../apache/flume/sink/kite/policy/FailurePolicy.java | 4 ++-- .../org/apache/flume/sink/kite/policy/SavePolicy.java | 5 ++++- .../org/apache/flume/sink/kite/TestDatasetSink.java | 8 ++++---- pom.xml | 4 ++-- 6 files changed, 19 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/4e840400/flume-ng-sinks/flume-dataset-sink/pom.xml ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-dataset-sink/pom.xml b/flume-ng-sinks/flume-dataset-sink/pom.xml index e929d60..ad3f603 100644 --- a/flume-ng-sinks/flume-dataset-sink/pom.xml +++ b/flume-ng-sinks/flume-dataset-sink/pom.xml @@ -93,7 +93,7 @@ limitations under the License. <dependency> <groupId>org.kitesdk</groupId> - <artifactId>kite-data-hcatalog</artifactId> + <artifactId>kite-data-hive</artifactId> </dependency> <dependency> http://git-wip-us.apache.org/repos/asf/flume/blob/4e840400/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/DatasetSink.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/DatasetSink.java b/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/DatasetSink.java index 3e66532..fd9f991 100644 --- a/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/DatasetSink.java +++ b/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/DatasetSink.java @@ -46,8 +46,9 @@ import org.kitesdk.data.DatasetDescriptor; import org.kitesdk.data.DatasetIOException; import org.kitesdk.data.DatasetNotFoundException; import org.kitesdk.data.DatasetWriter; -import org.kitesdk.data.DatasetWriterException; import org.kitesdk.data.Datasets; +import org.kitesdk.data.Flushable; +import org.kitesdk.data.Syncable; import org.kitesdk.data.View; import org.kitesdk.data.spi.Registration; import org.kitesdk.data.URIBuilder; @@ -305,10 +306,10 @@ public class DatasetSink extends AbstractSink implements Configurable { if (commitOnBatch) { // Flush/sync before commiting. A failure here will result in rolling back // the transaction - if (syncOnBatch) { - writer.sync(); - } else { - writer.flush(); + if (syncOnBatch && writer instanceof Syncable) { + ((Syncable) writer).sync(); + } else if (writer instanceof Flushable) { + ((Flushable) writer).flush(); } boolean committed = commitTransaction(); Preconditions.checkState(committed, @@ -484,8 +485,6 @@ public class DatasetSink extends AbstractSink implements Configurable { throw new EventDeliveryException("Check HDFS permissions/health. IO" + " error trying to close the writer for dataset " + datasetUri, ex); - } catch (DatasetWriterException ex) { - throw new EventDeliveryException("Failure moving temp file.", ex); } catch (RuntimeException ex) { throw new EventDeliveryException("Error trying to close the writer for" + " dataset " + datasetUri, ex); http://git-wip-us.apache.org/repos/asf/flume/blob/4e840400/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/policy/FailurePolicy.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/policy/FailurePolicy.java b/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/policy/FailurePolicy.java index 47b6a25..f6f875a 100644 --- a/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/policy/FailurePolicy.java +++ b/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/policy/FailurePolicy.java @@ -22,7 +22,7 @@ import org.apache.flume.Context; import org.apache.flume.Event; import org.apache.flume.EventDeliveryException; import org.apache.flume.sink.kite.DatasetSink; -import org.kitesdk.data.DatasetWriter; +import org.kitesdk.data.Syncable; /** * A policy for dealing with non-recoverable event delivery failures. @@ -68,7 +68,7 @@ public interface FailurePolicy { * This allows the policy implementation to sync any data that it may not * have fully handled. * - * See {@link DatasetWriter#sync()}. + * See {@link Syncable#sync()}. * * @throws EventDeliveryException The policy failed while syncing data. * When this is thrown, the Flume transaction http://git-wip-us.apache.org/repos/asf/flume/blob/4e840400/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/policy/SavePolicy.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/policy/SavePolicy.java b/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/policy/SavePolicy.java index ed47898..bd537ec 100644 --- a/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/policy/SavePolicy.java +++ b/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/policy/SavePolicy.java @@ -30,6 +30,7 @@ import org.kitesdk.data.DatasetDescriptor; import org.kitesdk.data.DatasetWriter; import org.kitesdk.data.Datasets; import org.kitesdk.data.Formats; +import org.kitesdk.data.Syncable; import org.kitesdk.data.View; import static org.apache.flume.sink.kite.DatasetSinkConstants.*; @@ -87,7 +88,9 @@ public class SavePolicy implements FailurePolicy { // dataset close(); } else { - writer.sync(); + if (writer instanceof Syncable) { + ((Syncable) writer).sync(); + } } } } http://git-wip-us.apache.org/repos/asf/flume/blob/4e840400/flume-ng-sinks/flume-dataset-sink/src/test/java/org/apache/flume/sink/kite/TestDatasetSink.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-dataset-sink/src/test/java/org/apache/flume/sink/kite/TestDatasetSink.java b/flume-ng-sinks/flume-dataset-sink/src/test/java/org/apache/flume/sink/kite/TestDatasetSink.java index 58aa467..621920d 100644 --- a/flume-ng-sinks/flume-dataset-sink/src/test/java/org/apache/flume/sink/kite/TestDatasetSink.java +++ b/flume-ng-sinks/flume-dataset-sink/src/test/java/org/apache/flume/sink/kite/TestDatasetSink.java @@ -78,12 +78,12 @@ import static org.mockito.Mockito.*; public class TestDatasetSink { - public static final String FILE_REPO_URI = "repo:file:target/test-repo"; + public static final String FILE_REPO_URI = "repo:file:target/test_repo"; public static final String DATASET_NAME = "test"; public static final String FILE_DATASET_URI = - "dataset:file:target/test-repo/" + DATASET_NAME; + "dataset:file:target/test_repo/" + DATASET_NAME; public static final String ERROR_DATASET_URI = - "dataset:file:target/test-repo/failed-events"; + "dataset:file:target/test_repo/failed_events"; public static final File SCHEMA_FILE = new File("target/record-schema.avsc"); public static final Schema RECORD_SCHEMA = new Schema.Parser().parse( "{\"type\":\"record\",\"name\":\"rec\",\"fields\":[" + @@ -254,7 +254,7 @@ public class TestDatasetSink { @Test public void testPartitionedData() throws EventDeliveryException { - URI partitionedUri = URI.create("dataset:file:target/test-repo/partitioned"); + URI partitionedUri = URI.create("dataset:file:target/test_repo/partitioned"); try { Datasets.create(partitionedUri, new DatasetDescriptor.Builder(DESCRIPTOR) .partitionStrategy(new PartitionStrategy.Builder() http://git-wip-us.apache.org/repos/asf/flume/blob/4e840400/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index ea7ffe3..3e40558 100644 --- a/pom.xml +++ b/pom.xml @@ -50,7 +50,7 @@ limitations under the License. <elasticsearch.version>0.90.1</elasticsearch.version> <hadoop2.version>2.4.0</hadoop2.version> <thrift.version>0.7.0</thrift.version> - <kite.version>0.17.1</kite.version> + <kite.version>1.0.0</kite.version> <hive.version>0.13.1</hive.version> <xalan.verion>2.7.1</xalan.verion> <xerces.version>2.9.1</xerces.version> @@ -1328,7 +1328,7 @@ limitations under the License. </dependency> <dependency> <groupId>org.kitesdk</groupId> - <artifactId>kite-data-hcatalog</artifactId> + <artifactId>kite-data-hive</artifactId> <version>${kite.version}</version> </dependency> <dependency>
