FLUME-2591. DatasetSink 2.0 (Joey Echeverria via Hari)
Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/914106c0 Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/914106c0 Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/914106c0 Branch: refs/heads/flume-1.6 Commit: 914106c0f12650a7b16ba565e2aaddaad3d95540 Parents: 13a0300 Author: Hari Shreedharan <[email protected]> Authored: Wed Jan 28 17:41:51 2015 -0800 Committer: Hari Shreedharan <[email protected]> Committed: Wed Jan 28 17:41:51 2015 -0800 ---------------------------------------------------------------------- flume-ng-doc/sphinx/FlumeUserGuide.rst | 62 +- .../org/apache/flume/sink/kite/DatasetSink.java | 650 ++++++++++++------- .../flume/sink/kite/DatasetSinkConstants.java | 74 ++- .../sink/kite/NonRecoverableEventException.java | 54 ++ .../flume/sink/kite/parser/AvroParser.java | 208 ++++++ .../flume/sink/kite/parser/EntityParser.java | 56 ++ .../sink/kite/parser/EntityParserFactory.java | 82 +++ .../flume/sink/kite/policy/FailurePolicy.java | 105 +++ .../sink/kite/policy/FailurePolicyFactory.java | 82 +++ .../flume/sink/kite/policy/RetryPolicy.java | 63 ++ .../flume/sink/kite/policy/SavePolicy.java | 125 ++++ .../apache/flume/sink/kite/TestDatasetSink.java | 411 +++++++++++- pom.xml | 2 +- 13 files changed, 1730 insertions(+), 244 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/914106c0/flume-ng-doc/sphinx/FlumeUserGuide.rst ---------------------------------------------------------------------- diff --git a/flume-ng-doc/sphinx/FlumeUserGuide.rst b/flume-ng-doc/sphinx/FlumeUserGuide.rst index bcadc2d..7a1dfce 100644 --- a/flume-ng-doc/sphinx/FlumeUserGuide.rst +++ b/flume-ng-doc/sphinx/FlumeUserGuide.rst @@ -2158,23 +2158,51 @@ Note 2: In some cases, file rolling may occur slightly after the roll interval has been exceeded. However, this delay will not exceed 5 seconds. In most cases, the delay is neglegible. -======================= ======= =========================================================== -Property Name Default Description -======================= ======= =========================================================== -**channel** -- -**type** -- Must be org.apache.flume.sink.kite.DatasetSink -**kite.dataset.uri** -- URI of the dataset to open -kite.repo.uri -- URI of the repository to open - (deprecated; use kite.dataset.uri instead) -kite.dataset.name -- Name of the Dataset where records will be written - (deprecated; use kite.dataset.uri instead) -kite.batchSize 100 Number of records to process in each batch -kite.rollInterval 30 Maximum wait time (seconds) before data files are released -auth.kerberosPrincipal -- Kerberos user principal for secure authentication to HDFS -auth.kerberosKeytab -- Kerberos keytab location (local FS) for the principal -auth.proxyUser -- The effective user for HDFS actions, if different from - the kerberos principal -======================= ======= =========================================================== +============================ ======= =========================================================== +Property Name Default Description +============================ ======= =========================================================== +**channel** -- +**type** -- Must be org.apache.flume.sink.kite.DatasetSink +**kite.dataset.uri** -- URI of the dataset to open +kite.repo.uri -- URI of the repository to open + (deprecated; use kite.dataset.uri instead) +kite.dataset.namespace -- Namespace of the Dataset where records will be written + (deprecated; use kite.dataset.uri instead) +kite.dataset.name -- Name of the Dataset where records will be written + (deprecated; use kite.dataset.uri instead) +kite.batchSize 100 Number of records to process in each batch +kite.rollInterval 30 Maximum wait time (seconds) before data files are released +kite.flushable.commitOnBatch true If ``true``, the Flume transaction will be commited and the + writer will be flushed on each batch of ``kite.batchSize`` + records. This setting only applies to flushable datasets. When + ``true``, it's possible for temp files with commited data to be + left in the dataset directory. These files need to be recovered + by hand for the data to be visible to DatasetReaders. +kite.syncable.syncOnBatch true Controls whether the sink will also sync data when committing + the transaction. This setting only applies to syncable datasets. + Syncing gaurentees that data will be written on stable storage + on the remote system while flushing only gaurentees that data + has left Flume's client buffers. When the + ``kite.flushable.commitOnBatch`` property is set to ``false``, + this property must also be set to ``false``. +kite.entityParser avro Parser that turns Flume ``Events`` into Kite entities. + Valid values are ``avro`` and the fully-qualified class name + of an implementation of the ``EntityParser.Builder`` interface. +kite.failurePolicy retry Policy that handles non-recoverable errors such as a missing + ``Schema`` in the ``Event`` header. The default value, ``retry``, + will fail the current batch and try again which matches the old + behavior. Other valid values are ``save``, which will write the + raw ``Event`` to the ``kite.error.dataset.uri`` dataset, and the + fully-qualified class name of an implementation of the + ``FailurePolicy.Builder`` interface. +kite.error.dataset.uri -- URI of the dataset where failed events are saved when + ``kite.failurePolicy`` is set to ``save``. **Required** when + the ``kite.failurePolicy`` is set to ``save``. +auth.kerberosPrincipal -- Kerberos user principal for secure authentication to HDFS +auth.kerberosKeytab -- Kerberos keytab location (local FS) for the principal +auth.proxyUser -- The effective user for HDFS actions, if different from + the kerberos principal +============================ ======= =========================================================== Kafka Sink http://git-wip-us.apache.org/repos/asf/flume/blob/914106c0/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/DatasetSink.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/DatasetSink.java b/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/DatasetSink.java index ebcc617..3e66532 100644 --- a/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/DatasetSink.java +++ b/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/DatasetSink.java @@ -15,31 +15,23 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.flume.sink.kite; +import org.apache.flume.sink.kite.parser.EntityParserFactory; +import org.apache.flume.sink.kite.parser.EntityParser; +import org.apache.flume.sink.kite.policy.FailurePolicy; +import org.apache.flume.sink.kite.policy.FailurePolicyFactory; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.base.Throwables; -import com.google.common.cache.CacheBuilder; -import com.google.common.cache.CacheLoader; -import com.google.common.cache.LoadingCache; import com.google.common.collect.Lists; -import java.io.IOException; -import java.io.InputStream; import java.net.URI; -import java.net.URL; import java.security.PrivilegedExceptionAction; import java.util.List; -import java.util.Locale; -import java.util.Map; -import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; import org.apache.avro.Schema; -import org.apache.avro.generic.GenericDatumReader; +import org.apache.avro.file.DataFileWriter; import org.apache.avro.generic.GenericRecord; -import org.apache.avro.io.BinaryDecoder; -import org.apache.avro.io.DatumReader; -import org.apache.avro.io.DecoderFactory; import org.apache.flume.Channel; import org.apache.flume.Context; import org.apache.flume.Event; @@ -48,144 +40,180 @@ import org.apache.flume.Transaction; import org.apache.flume.conf.Configurable; import org.apache.flume.instrumentation.SinkCounter; import org.apache.flume.sink.AbstractSink; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; import org.apache.hadoop.security.UserGroupInformation; import org.kitesdk.data.Dataset; import org.kitesdk.data.DatasetDescriptor; -import org.kitesdk.data.DatasetException; +import org.kitesdk.data.DatasetIOException; +import org.kitesdk.data.DatasetNotFoundException; import org.kitesdk.data.DatasetWriter; +import org.kitesdk.data.DatasetWriterException; import org.kitesdk.data.Datasets; import org.kitesdk.data.View; import org.kitesdk.data.spi.Registration; -import org.kitesdk.data.spi.URIBuilder; +import org.kitesdk.data.URIBuilder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static org.apache.flume.sink.kite.DatasetSinkConstants.*; +import org.kitesdk.data.Format; +import org.kitesdk.data.Formats; + /** - * Experimental sink that writes events to a Kite Dataset. This sink will - * deserialize the body of each incoming event and store the resulting record - * in a Kite Dataset. It determines target Dataset by opening a repository URI, - * {@code kite.repo.uri}, and loading a Dataset by name, - * {@code kite.dataset.name}. + * Sink that writes events to a Kite Dataset. This sink will parse the body of + * each incoming event and store the resulting entity in a Kite Dataset. It + * determines the destination Dataset by opening a dataset URI + * {@code kite.dataset.uri} or opening a repository URI, {@code kite.repo.uri}, + * and loading a Dataset by name, {@code kite.dataset.name}, and namespace, + * {@code kite.dataset.namespace}. */ public class DatasetSink extends AbstractSink implements Configurable { private static final Logger LOG = LoggerFactory.getLogger(DatasetSink.class); - static Configuration conf = new Configuration(); + private Context context = null; + private UserGroupInformation login = null; private String datasetName = null; - private long batchSize = DatasetSinkConstants.DEFAULT_BATCH_SIZE; - - private URI target = null; - private Schema targetSchema = null; + private URI datasetUri = null; + private Schema datasetSchema = null; private DatasetWriter<GenericRecord> writer = null; - private UserGroupInformation login = null; - private SinkCounter counter = null; - // for rolling files at a given interval - private int rollIntervalS = DatasetSinkConstants.DEFAULT_ROLL_INTERVAL; - private long lastRolledMs = 0l; + /** + * The number of events to process as a single batch. + */ + private long batchSize = DEFAULT_BATCH_SIZE; + + /** + * The number of seconds to wait before rolling a writer. + */ + private int rollIntervalSeconds = DEFAULT_ROLL_INTERVAL; + + /** + * Flag that says if Flume should commit on every batch. + */ + private boolean commitOnBatch = DEFAULT_FLUSHABLE_COMMIT_ON_BATCH; + + /** + * Flag that says if Flume should sync on every batch. + */ + private boolean syncOnBatch = DEFAULT_SYNCABLE_SYNC_ON_BATCH; + + /** + * The last time the writer rolled. + */ + private long lastRolledMillis = 0l; - // for working with avro serialized records - private GenericRecord datum = null; + /** + * The raw number of bytes parsed. + */ + private long bytesParsed = 0l; + + /** + * A class for parsing Kite entities from Flume Events. + */ + private EntityParser<GenericRecord> parser = null; + + /** + * A class implementing a failure newPolicy for events that had a + non-recoverable error during processing. + */ + private FailurePolicy failurePolicy = null; + + private SinkCounter counter = null; + + /** + * The Kite entity + */ + private GenericRecord entity = null; // TODO: remove this after PARQUET-62 is released - private boolean reuseDatum = true; - private BinaryDecoder decoder = null; - private LoadingCache<Schema, DatumReader<GenericRecord>> readers = - CacheBuilder.newBuilder() - .build(new CacheLoader<Schema, DatumReader<GenericRecord>>() { - @Override - public DatumReader<GenericRecord> load(Schema schema) { - // must use the target dataset's schema for reading to ensure the - // records are able to be stored using it - return new GenericDatumReader<GenericRecord>( - schema, targetSchema); - } - }); - private static LoadingCache<String, Schema> schemasFromLiteral = CacheBuilder - .newBuilder() - .build(new CacheLoader<String, Schema>() { - @Override - public Schema load(String literal) { - Preconditions.checkNotNull(literal, - "Schema literal cannot be null without a Schema URL"); - return new Schema.Parser().parse(literal); - } - }); - private static LoadingCache<String, Schema> schemasFromURL = CacheBuilder - .newBuilder() - .build(new CacheLoader<String, Schema>() { - @Override - public Schema load(String url) throws IOException { - Schema.Parser parser = new Schema.Parser(); - InputStream is = null; - try { - FileSystem fs = FileSystem.get(URI.create(url), conf); - if (url.toLowerCase(Locale.ENGLISH).startsWith("hdfs:/")) { - is = fs.open(new Path(url)); - } else { - is = new URL(url).openStream(); - } - return parser.parse(is); - } finally { - if (is != null) { - is.close(); - } - } - } - }); + private boolean reuseEntity = true; + + /** + * The Flume transaction. Used to keep transactions open across calls to + * process. + */ + private Transaction transaction = null; + + /** + * Internal flag on if there has been a batch of records committed. This is + * used during rollback to know if the current writer needs to be closed. + */ + private boolean committedBatch = false; + // Factories + private static final EntityParserFactory ENTITY_PARSER_FACTORY = + new EntityParserFactory(); + private static final FailurePolicyFactory FAILURE_POLICY_FACTORY = + new FailurePolicyFactory(); + + /** + * Return the list of allowed formats. + * @return The list of allowed formats. + */ protected List<String> allowedFormats() { return Lists.newArrayList("avro", "parquet"); } @Override public void configure(Context context) { + this.context = context; + // initialize login credentials this.login = KerberosUtil.login( - context.getString(DatasetSinkConstants.AUTH_PRINCIPAL), - context.getString(DatasetSinkConstants.AUTH_KEYTAB)); - String effectiveUser = - context.getString(DatasetSinkConstants.AUTH_PROXY_USER); + context.getString(AUTH_PRINCIPAL), + context.getString(AUTH_KEYTAB)); + String effectiveUser + = context.getString(AUTH_PROXY_USER); if (effectiveUser != null) { this.login = KerberosUtil.proxyAs(effectiveUser, login); } - String datasetURI = context.getString( - DatasetSinkConstants.CONFIG_KITE_DATASET_URI); + // Get the dataset URI and name from the context + String datasetURI = context.getString(CONFIG_KITE_DATASET_URI); if (datasetURI != null) { - this.target = URI.create(datasetURI); - this.datasetName = uriToName(target); + this.datasetUri = URI.create(datasetURI); + this.datasetName = uriToName(datasetUri); } else { - String repositoryURI = context.getString( - DatasetSinkConstants.CONFIG_KITE_REPO_URI); - Preconditions.checkNotNull(repositoryURI, "Repository URI is missing"); - this.datasetName = context.getString( - DatasetSinkConstants.CONFIG_KITE_DATASET_NAME); - Preconditions.checkNotNull(datasetName, "Dataset name is missing"); - - this.target = new URIBuilder(repositoryURI, datasetName).build(); + String repositoryURI = context.getString(CONFIG_KITE_REPO_URI); + Preconditions.checkNotNull(repositoryURI, "No dataset configured. Setting " + + CONFIG_KITE_DATASET_URI + " is required."); + + this.datasetName = context.getString(CONFIG_KITE_DATASET_NAME); + Preconditions.checkNotNull(datasetName, "No dataset configured. Setting " + + CONFIG_KITE_DATASET_URI + " is required."); + + String namespace = context.getString(CONFIG_KITE_DATASET_NAMESPACE, + DEFAULT_NAMESPACE); + + this.datasetUri = new URIBuilder(repositoryURI, namespace, datasetName) + .build(); + } + this.setName(datasetUri.toString()); + + if (context.getBoolean(CONFIG_SYNCABLE_SYNC_ON_BATCH, + DEFAULT_SYNCABLE_SYNC_ON_BATCH)) { + Preconditions.checkArgument( + context.getBoolean(CONFIG_FLUSHABLE_COMMIT_ON_BATCH, + DEFAULT_FLUSHABLE_COMMIT_ON_BATCH), "Configuration error: " + + CONFIG_FLUSHABLE_COMMIT_ON_BATCH + " must be set to true when " + + CONFIG_SYNCABLE_SYNC_ON_BATCH + " is set to true."); } - this.setName(target.toString()); + // Create the configured failure failurePolicy + this.failurePolicy = FAILURE_POLICY_FACTORY.newPolicy(context); // other configuration - this.batchSize = context.getLong( - DatasetSinkConstants.CONFIG_KITE_BATCH_SIZE, - DatasetSinkConstants.DEFAULT_BATCH_SIZE); - this.rollIntervalS = context.getInteger( - DatasetSinkConstants.CONFIG_KITE_ROLL_INTERVAL, - DatasetSinkConstants.DEFAULT_ROLL_INTERVAL); + this.batchSize = context.getLong(CONFIG_KITE_BATCH_SIZE, + DEFAULT_BATCH_SIZE); + this.rollIntervalSeconds = context.getInteger(CONFIG_KITE_ROLL_INTERVAL, + DEFAULT_ROLL_INTERVAL); this.counter = new SinkCounter(datasetName); } @Override public synchronized void start() { - this.lastRolledMs = System.currentTimeMillis(); + this.lastRolledMillis = System.currentTimeMillis(); counter.start(); // signal that this sink is ready to process LOG.info("Started DatasetSink " + getName()); @@ -196,183 +224,359 @@ public class DatasetSink extends AbstractSink implements Configurable { * Causes the sink to roll at the next {@link #process()} call. */ @VisibleForTesting - public void roll() { - this.lastRolledMs = 0l; + void roll() { + this.lastRolledMillis = 0l; + } + + @VisibleForTesting + DatasetWriter<GenericRecord> getWriter() { + return writer; + } + + @VisibleForTesting + void setWriter(DatasetWriter<GenericRecord> writer) { + this.writer = writer; + } + + @VisibleForTesting + void setParser(EntityParser<GenericRecord> parser) { + this.parser = parser; + } + + @VisibleForTesting + void setFailurePolicy(FailurePolicy failurePolicy) { + this.failurePolicy = failurePolicy; } @Override public synchronized void stop() { counter.stop(); - if (writer != null) { - // any write problems invalidate the writer, which is immediately closed - writer.close(); - this.writer = null; - this.lastRolledMs = System.currentTimeMillis(); + try { + // Close the writer and commit the transaction, but don't create a new + // writer since we're stopping + closeWriter(); + commitTransaction(); + } catch (EventDeliveryException ex) { + rollbackTransaction(); + + LOG.warn("Closing the writer failed: " + ex.getLocalizedMessage()); + LOG.debug("Exception follows.", ex); + // We don't propogate the exception as the transaction would have been + // rolled back and we can still finish stopping } - // signal that this sink has stopped + // signal that this sink has stopped LOG.info("Stopped dataset sink: " + getName()); super.stop(); } @Override public Status process() throws EventDeliveryException { - if (writer == null) { - try { - this.writer = newWriter(login, target); - } catch (DatasetException e) { - // DatasetException includes DatasetNotFoundException - throw new EventDeliveryException( - "Cannot write to " + getName(), e); + long processedEvents = 0; + + try { + if (shouldRoll()) { + closeWriter(); + commitTransaction(); + createWriter(); } - } - // handle file rolling - if ((System.currentTimeMillis() - lastRolledMs) / 1000 > rollIntervalS) { - // close the current writer and get a new one - writer.close(); - this.writer = newWriter(login, target); - this.lastRolledMs = System.currentTimeMillis(); - LOG.info("Rolled writer for " + getName()); - } + // The writer shouldn't be null at this point + Preconditions.checkNotNull(writer, + "Can't process events with a null writer. This is likely a bug."); + Channel channel = getChannel(); - Channel channel = getChannel(); - Transaction transaction = null; - try { - long processedEvents = 0; + // Enter the transaction boundary if we haven't already + enterTransaction(channel); - transaction = channel.getTransaction(); - transaction.begin(); for (; processedEvents < batchSize; processedEvents += 1) { Event event = channel.take(); + if (event == null) { // no events available in the channel break; } - this.datum = deserialize(event, reuseDatum ? datum : null); - - // writeEncoded would be an optimization in some cases, but HBase - // will not support it and partitioned Datasets need to get partition - // info from the entity Object. We may be able to avoid the - // serialization round-trip otherwise. - writer.write(datum); + write(event); } - // TODO: Add option to sync, depends on CDK-203 - writer.flush(); - - // commit after data has been written and flushed - transaction.commit(); - - if (processedEvents == 0) { - counter.incrementBatchEmptyCount(); - return Status.BACKOFF; - } else if (processedEvents < batchSize) { - counter.incrementBatchUnderflowCount(); - } else { - counter.incrementBatchCompleteCount(); + // commit transaction + if (commitOnBatch) { + // Flush/sync before commiting. A failure here will result in rolling back + // the transaction + if (syncOnBatch) { + writer.sync(); + } else { + writer.flush(); + } + boolean committed = commitTransaction(); + Preconditions.checkState(committed, + "Tried to commit a batch when there was no transaction"); + committedBatch |= committed; } - - counter.addToEventDrainSuccessCount(processedEvents); - - return Status.READY; - } catch (Throwable th) { // catch-all for any unhandled Throwable so that the transaction is // correctly rolled back. - if (transaction != null) { + rollbackTransaction(); + + if (commitOnBatch && committedBatch) { try { - transaction.rollback(); - } catch (Exception ex) { - LOG.error("Transaction rollback failed", ex); - throw Throwables.propagate(ex); + closeWriter(); + } catch (EventDeliveryException ex) { + LOG.warn("Error closing writer there may be temp files that need to" + + " be manually recovered: " + ex.getLocalizedMessage()); + LOG.debug("Exception follows.", ex); } + } else { + this.writer = null; } - // close the writer and remove the its reference - writer.close(); - this.writer = null; - this.lastRolledMs = System.currentTimeMillis(); - // handle the exception Throwables.propagateIfInstanceOf(th, Error.class); Throwables.propagateIfInstanceOf(th, EventDeliveryException.class); throw new EventDeliveryException(th); - - } finally { - if (transaction != null) { - transaction.close(); - } } - } - private DatasetWriter<GenericRecord> newWriter( - final UserGroupInformation login, final URI uri) { - View<GenericRecord> view = KerberosUtil.runPrivileged(login, - new PrivilegedExceptionAction<Dataset<GenericRecord>>() { - @Override - public Dataset<GenericRecord> run() { - return Datasets.load(uri); - } - }); - - DatasetDescriptor descriptor = view.getDataset().getDescriptor(); - String formatName = descriptor.getFormat().getName(); - Preconditions.checkArgument(allowedFormats().contains(formatName), - "Unsupported format: " + formatName); - - Schema newSchema = descriptor.getSchema(); - if (targetSchema == null || !newSchema.equals(targetSchema)) { - this.targetSchema = descriptor.getSchema(); - // target dataset schema has changed, invalidate all readers based on it - readers.invalidateAll(); + if (processedEvents == 0) { + counter.incrementBatchEmptyCount(); + return Status.BACKOFF; + } else if (processedEvents < batchSize) { + counter.incrementBatchUnderflowCount(); + } else { + counter.incrementBatchCompleteCount(); } - this.reuseDatum = !("parquet".equals(formatName)); - this.datasetName = view.getDataset().getName(); + counter.addToEventDrainSuccessCount(processedEvents); - return view.newWriter(); + return Status.READY; } /** - * Not thread-safe. + * Parse the event using the entity parser and write the entity to the dataset. * - * @param event - * @param reuse - * @return + * @param event The event to write + * @throws EventDeliveryException An error occurred trying to write to the + dataset that couldn't or shouldn't be + handled by the failure policy. */ - private GenericRecord deserialize(Event event, GenericRecord reuse) - throws EventDeliveryException { - decoder = DecoderFactory.get().binaryDecoder(event.getBody(), decoder); - // no checked exception is thrown in the CacheLoader - DatumReader<GenericRecord> reader = readers.getUnchecked(schema(event)); + @VisibleForTesting + void write(Event event) throws EventDeliveryException { try { - return reader.read(reuse, decoder); - } catch (IOException ex) { - throw new EventDeliveryException("Cannot deserialize event", ex); + this.entity = parser.parse(event, reuseEntity ? entity : null); + this.bytesParsed += event.getBody().length; + + // writeEncoded would be an optimization in some cases, but HBase + // will not support it and partitioned Datasets need to get partition + // info from the entity Object. We may be able to avoid the + // serialization round-trip otherwise. + writer.write(entity); + } catch (NonRecoverableEventException ex) { + failurePolicy.handle(event, ex); + } catch (DataFileWriter.AppendWriteException ex) { + failurePolicy.handle(event, ex); + } catch (RuntimeException ex) { + Throwables.propagateIfInstanceOf(ex, EventDeliveryException.class); + throw new EventDeliveryException(ex); } } - private static Schema schema(Event event) throws EventDeliveryException { - Map<String, String> headers = event.getHeaders(); - String schemaURL = headers.get( - DatasetSinkConstants.AVRO_SCHEMA_URL_HEADER); + /** + * Create a new writer. + * + * This method also re-loads the dataset so updates to the configuration or + * a dataset created after Flume starts will be loaded. + * + * @throws EventDeliveryException There was an error creating the writer. + */ + @VisibleForTesting + void createWriter() throws EventDeliveryException { + // reset the commited flag whenver a new writer is created + committedBatch = false; try { - if (headers.get(DatasetSinkConstants.AVRO_SCHEMA_URL_HEADER) != null) { - return schemasFromURL.get(schemaURL); - } else { - return schemasFromLiteral.get( - headers.get(DatasetSinkConstants.AVRO_SCHEMA_LITERAL_HEADER)); + View<GenericRecord> view = KerberosUtil.runPrivileged(login, + new PrivilegedExceptionAction<Dataset<GenericRecord>>() { + @Override + public Dataset<GenericRecord> run() { + return Datasets.load(datasetUri); + } + }); + + DatasetDescriptor descriptor = view.getDataset().getDescriptor(); + Format format = descriptor.getFormat(); + Preconditions.checkArgument(allowedFormats().contains(format.getName()), + "Unsupported format: " + format.getName()); + + Schema newSchema = descriptor.getSchema(); + if (datasetSchema == null || !newSchema.equals(datasetSchema)) { + this.datasetSchema = descriptor.getSchema(); + // dataset schema has changed, create a new parser + parser = ENTITY_PARSER_FACTORY.newParser(datasetSchema, context); } - } catch (ExecutionException ex) { - throw new EventDeliveryException("Cannot get schema", ex.getCause()); + + this.reuseEntity = !(Formats.PARQUET.equals(format)); + + // TODO: Check that the format implements Flushable after CDK-863 + // goes in. For now, just check that the Dataset is Avro format + this.commitOnBatch = context.getBoolean(CONFIG_FLUSHABLE_COMMIT_ON_BATCH, + DEFAULT_FLUSHABLE_COMMIT_ON_BATCH) && (Formats.AVRO.equals(format)); + + // TODO: Check that the format implements Syncable after CDK-863 + // goes in. For now, just check that the Dataset is Avro format + this.syncOnBatch = context.getBoolean(CONFIG_SYNCABLE_SYNC_ON_BATCH, + DEFAULT_SYNCABLE_SYNC_ON_BATCH) && (Formats.AVRO.equals(format)); + + this.datasetName = view.getDataset().getName(); + + this.writer = view.newWriter(); + + // Reset the last rolled time and the metrics + this.lastRolledMillis = System.currentTimeMillis(); + this.bytesParsed = 0l; + } catch (DatasetNotFoundException ex) { + throw new EventDeliveryException("Dataset " + datasetUri + " not found." + + " The dataset must be created before Flume can write to it.", ex); + } catch (RuntimeException ex) { + throw new EventDeliveryException("Error trying to open a new" + + " writer for dataset " + datasetUri, ex); } } + /** + * Return true if the sink should roll the writer. + * + * Currently, this is based on time since the last roll or if the current + * writer is null. + * + * @return True if and only if the sink should roll the writer + */ + private boolean shouldRoll() { + long currentTimeMillis = System.currentTimeMillis(); + long elapsedTimeSeconds = TimeUnit.MILLISECONDS.toSeconds( + currentTimeMillis - lastRolledMillis); + + LOG.debug("Current time: {}, lastRolled: {}, diff: {} sec", + new Object[] {currentTimeMillis, lastRolledMillis, elapsedTimeSeconds}); + + return elapsedTimeSeconds >= rollIntervalSeconds || writer == null; + } + + /** + * Close the current writer. + * + * This method always sets the current writer to null even if close fails. + * If this method throws an Exception, callers *must* rollback any active + * transaction to ensure that data is replayed. + * + * @throws EventDeliveryException + */ + @VisibleForTesting + void closeWriter() throws EventDeliveryException { + if (writer != null) { + try { + writer.close(); + + long elapsedTimeSeconds = TimeUnit.MILLISECONDS.toSeconds( + System.currentTimeMillis() - lastRolledMillis); + LOG.info("Closed writer for {} after {} seconds and {} bytes parsed", + new Object[]{datasetUri, elapsedTimeSeconds, bytesParsed}); + } catch (DatasetIOException ex) { + throw new EventDeliveryException("Check HDFS permissions/health. IO" + + " error trying to close the writer for dataset " + datasetUri, + ex); + } catch (DatasetWriterException ex) { + throw new EventDeliveryException("Failure moving temp file.", ex); + } catch (RuntimeException ex) { + throw new EventDeliveryException("Error trying to close the writer for" + + " dataset " + datasetUri, ex); + } finally { + // If we failed to close the writer then we give up on it as we'll + // end up throwing an EventDeliveryException which will result in + // a transaction rollback and a replay of any events written during + // the current transaction. If commitOnBatch is true, you can still + // end up with orphaned temp files that have data to be recovered. + this.writer = null; + failurePolicy.close(); + } + } + } + + /** + * Enter the transaction boundary. This will either begin a new transaction + * if one didn't already exist. If we're already in a transaction boundary, + * then this method does nothing. + * + * @param channel The Sink's channel + * @throws EventDeliveryException There was an error starting a new batch + * with the failure policy. + */ + private void enterTransaction(Channel channel) throws EventDeliveryException { + // There's no synchronization around the transaction instance because the + // Sink API states "the Sink#process() call is guaranteed to only + // be accessed by a single thread". Technically other methods could be + // called concurrently, but the implementation of SinkRunner waits + // for the Thread running process() to end before calling stop() + if (transaction == null) { + this.transaction = channel.getTransaction(); + transaction.begin(); + failurePolicy = FAILURE_POLICY_FACTORY.newPolicy(context); + } + } + + /** + * Commit and close the transaction. + * + * If this method throws an Exception the caller *must* ensure that the + * transaction is rolled back. Callers can roll back the transaction by + * calling {@link #rollbackTransaction()}. + * + * @return True if there was an open transaction and it was committed, false + * otherwise. + * @throws EventDeliveryException There was an error ending the batch with + * the failure policy. + */ + @VisibleForTesting + boolean commitTransaction() throws EventDeliveryException { + if (transaction != null) { + failurePolicy.sync(); + transaction.commit(); + transaction.close(); + this.transaction = null; + return true; + } else { + return false; + } + } + + /** + * Rollback the transaction. If there is a RuntimeException during rollback, + * it will be logged but the transaction instance variable will still be + * nullified. + */ + private void rollbackTransaction() { + if (transaction != null) { + try { + // If the transaction wasn't committed before we got the exception, we + // need to rollback. + transaction.rollback(); + } catch (RuntimeException ex) { + LOG.error("Transaction rollback failed: " + ex.getLocalizedMessage()); + LOG.debug("Exception follows.", ex); + } finally { + transaction.close(); + this.transaction = null; + } + } +} + + /** + * Get the name of the dataset from the URI + * + * @param uri The dataset or view URI + * @return The dataset name + */ private static String uriToName(URI uri) { return Registration.lookupDatasetUri(URI.create( - uri.getRawSchemeSpecificPart())).second().get("dataset"); + uri.getRawSchemeSpecificPart())).second().get("dataset"); } } http://git-wip-us.apache.org/repos/asf/flume/blob/914106c0/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/DatasetSinkConstants.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/DatasetSinkConstants.java b/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/DatasetSinkConstants.java index 3c67738..af33304 100644 --- a/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/DatasetSinkConstants.java +++ b/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/DatasetSinkConstants.java @@ -18,9 +18,11 @@ package org.apache.flume.sink.kite; +import org.kitesdk.data.URIBuilder; + public class DatasetSinkConstants { /** - * URI of the Kite DatasetRepository. + * URI of the Kite Dataset */ public static final String CONFIG_KITE_DATASET_URI = "kite.dataset.uri"; @@ -35,6 +37,13 @@ public class DatasetSinkConstants { public static final String CONFIG_KITE_DATASET_NAME = "kite.dataset.name"; /** + * Namespace of the Kite Dataset to write into. + */ + public static final String CONFIG_KITE_DATASET_NAMESPACE = + "kite.dataset.namespace"; + public static final String DEFAULT_NAMESPACE = URIBuilder.NAMESPACE_DEFAULT; + + /** * Number of records to process from the incoming channel per call to process. */ public static final String CONFIG_KITE_BATCH_SIZE = "kite.batchSize"; @@ -47,7 +56,68 @@ public class DatasetSinkConstants { public static int DEFAULT_ROLL_INTERVAL = 30; // seconds /** - * Headers with avro schema information is expected. + * Flag for committing the Flume transaction on each batch for Flushable + * datasets. When set to false, Flume will only commit the transaction when + * roll interval has expired. Setting this to false requires enough space + * in the channel to handle all events delivered during the roll interval. + * Defaults to true. + */ + public static final String CONFIG_FLUSHABLE_COMMIT_ON_BATCH = + "kite.flushable.commiteOnBatch"; + public static boolean DEFAULT_FLUSHABLE_COMMIT_ON_BATCH = true; + + /** + * Flag for syncing the DatasetWriter on each batch for Syncable + * datasets. Defaults to true. + */ + public static final String CONFIG_SYNCABLE_SYNC_ON_BATCH = + "kite.syncable.syncOnBatch"; + public static boolean DEFAULT_SYNCABLE_SYNC_ON_BATCH = true; + + /** + * Parser used to parse Flume Events into Kite entities. + */ + public static final String CONFIG_ENTITY_PARSER = "kite.entityParser"; + + /** + * Built-in entity parsers + */ + public static final String AVRO_ENTITY_PARSER = "avro"; + public static final String DEFAULT_ENTITY_PARSER = AVRO_ENTITY_PARSER; + public static final String[] AVAILABLE_PARSERS = new String[] { + AVRO_ENTITY_PARSER + }; + + /** + * Policy used to handle non-recoverable failures. + */ + public static final String CONFIG_FAILURE_POLICY = "kite.failurePolicy"; + + /** + * Write non-recoverable Flume events to a Kite dataset. + */ + public static final String SAVE_FAILURE_POLICY = "save"; + + /** + * The URI to write non-recoverable Flume events to in the case of an error. + * If the dataset doesn't exist, it will be created. + */ + public static final String CONFIG_KITE_ERROR_DATASET_URI = + "kite.error.dataset.uri"; + + /** + * Retry non-recoverable Flume events. This will lead to a never ending cycle + * of failure, but matches the previous default semantics of the DatasetSink. + */ + public static final String RETRY_FAILURE_POLICY = "retry"; + public static final String DEFAULT_FAILURE_POLICY = RETRY_FAILURE_POLICY; + public static final String[] AVAILABLE_POLICIES = new String[] { + RETRY_FAILURE_POLICY, + SAVE_FAILURE_POLICY + }; + + /** + * Headers where avro schema information is expected. */ public static final String AVRO_SCHEMA_LITERAL_HEADER = "flume.avro.schema.literal"; http://git-wip-us.apache.org/repos/asf/flume/blob/914106c0/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/NonRecoverableEventException.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/NonRecoverableEventException.java b/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/NonRecoverableEventException.java new file mode 100644 index 0000000..8f6c0ae --- /dev/null +++ b/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/NonRecoverableEventException.java @@ -0,0 +1,54 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flume.sink.kite; + + +/** + * A non-recoverable error trying to deliver the event. + * + * Non-recoverable event delivery failures include: + * + * 1. Error parsing the event body thrown from the {@link EntityParser} + * 2. A schema mismatch between the schema of an event and the schema of the + * destination dataset. + * 3. A missing schema from the Event header when using the + * {@link AvroEntityParser}. + */ +public class NonRecoverableEventException extends Exception { + + private static final long serialVersionUID = 3485151222482254285L; + + public NonRecoverableEventException() { + super(); + } + + public NonRecoverableEventException(String message) { + super(message); + } + + public NonRecoverableEventException(String message, Throwable t) { + super(message, t); + } + + public NonRecoverableEventException(Throwable t) { + super(t); + } + + +} http://git-wip-us.apache.org/repos/asf/flume/blob/914106c0/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/parser/AvroParser.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/parser/AvroParser.java b/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/parser/AvroParser.java new file mode 100644 index 0000000..7c6a723 --- /dev/null +++ b/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/parser/AvroParser.java @@ -0,0 +1,208 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flume.sink.kite.parser; + +import com.google.common.base.Preconditions; +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.CacheLoader; +import com.google.common.cache.LoadingCache; +import com.google.common.util.concurrent.UncheckedExecutionException; +import java.io.IOException; +import java.io.InputStream; +import java.net.URI; +import java.net.URL; +import java.util.Locale; +import java.util.Map; +import java.util.concurrent.ExecutionException; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericDatumReader; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.BinaryDecoder; +import org.apache.avro.io.DatumReader; +import org.apache.avro.io.DecoderFactory; +import org.apache.flume.Context; +import org.apache.flume.Event; +import org.apache.flume.EventDeliveryException; +import org.apache.flume.sink.kite.NonRecoverableEventException; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + +import static org.apache.flume.sink.kite.DatasetSinkConstants.*; + +/** + * An {@link EntityParser} that parses Avro serialized bytes from an event. + * + * The Avro schema used to serialize the data should be set as either a URL + * or literal in the flume.avro.schema.url or flume.avro.schema.literal event + * headers respectively. + */ +public class AvroParser implements EntityParser<GenericRecord> { + + static Configuration conf = new Configuration(); + + /** + * A cache of literal schemas to avoid re-parsing the schema. + */ + private static final LoadingCache<String, Schema> schemasFromLiteral = + CacheBuilder.newBuilder() + .build(new CacheLoader<String, Schema>() { + @Override + public Schema load(String literal) { + Preconditions.checkNotNull(literal, + "Schema literal cannot be null without a Schema URL"); + return new Schema.Parser().parse(literal); + } + }); + + /** + * A cache of schemas retrieved by URL to avoid re-parsing the schema. + */ + private static final LoadingCache<String, Schema> schemasFromURL = + CacheBuilder.newBuilder() + .build(new CacheLoader<String, Schema>() { + @Override + public Schema load(String url) throws IOException { + Schema.Parser parser = new Schema.Parser(); + InputStream is = null; + try { + FileSystem fs = FileSystem.get(URI.create(url), conf); + if (url.toLowerCase(Locale.ENGLISH).startsWith("hdfs:/")) { + is = fs.open(new Path(url)); + } else { + is = new URL(url).openStream(); + } + return parser.parse(is); + } finally { + if (is != null) { + is.close(); + } + } + } + }); + + /** + * The schema of the destination dataset. + * + * Used as the reader schema during parsing. + */ + private final Schema datasetSchema; + + /** + * A cache of DatumReaders per schema. + */ + private final LoadingCache<Schema, DatumReader<GenericRecord>> readers = + CacheBuilder.newBuilder() + .build(new CacheLoader<Schema, DatumReader<GenericRecord>>() { + @Override + public DatumReader<GenericRecord> load(Schema schema) { + // must use the target dataset's schema for reading to ensure the + // records are able to be stored using it + return new GenericDatumReader<GenericRecord>( + schema, datasetSchema); + } + }); + + /** + * The binary decoder to reuse for event parsing. + */ + private BinaryDecoder decoder = null; + + /** + * Create a new AvroParser given the schema of the destination dataset. + * + * @param datasetSchema The schema of the destination dataset. + */ + private AvroParser(Schema datasetSchema) { + this.datasetSchema = datasetSchema; + } + + /** + * Parse the entity from the body of the given event. + * + * @param event The event to parse. + * @param reuse If non-null, this may be reused and returned from this method. + * @return The parsed entity as a GenericRecord. + * @throws EventDeliveryException A recoverable error such as an error + * downloading the schema from the URL has + * occurred. + * @throws NonRecoverableEventException A non-recoverable error such as an + * unparsable schema or entity has + * occurred. + */ + @Override + public GenericRecord parse(Event event, GenericRecord reuse) + throws EventDeliveryException, NonRecoverableEventException { + decoder = DecoderFactory.get().binaryDecoder(event.getBody(), decoder); + + try { + DatumReader<GenericRecord> reader = readers.getUnchecked(schema(event)); + return reader.read(reuse, decoder); + } catch (IOException ex) { + throw new NonRecoverableEventException("Cannot deserialize event", ex); + } catch (RuntimeException ex) { + throw new NonRecoverableEventException("Cannot deserialize event", ex); + } + } + + /** + * Get the schema from the event headers. + * + * @param event The Flume event + * @return The schema for the event + * @throws EventDeliveryException A recoverable error such as an error + * downloading the schema from the URL has + * occurred. + * @throws NonRecoverableEventException A non-recoverable error such as an + * unparsable schema has occurred. + */ + private static Schema schema(Event event) throws EventDeliveryException, + NonRecoverableEventException { + Map<String, String> headers = event.getHeaders(); + String schemaURL = headers.get(AVRO_SCHEMA_URL_HEADER); + try { + if (schemaURL != null) { + return schemasFromURL.get(schemaURL); + } else { + String schemaLiteral = headers.get(AVRO_SCHEMA_LITERAL_HEADER); + if (schemaLiteral == null) { + throw new NonRecoverableEventException("No schema in event headers." + + " Headers must include either " + AVRO_SCHEMA_URL_HEADER + + " or " + AVRO_SCHEMA_LITERAL_HEADER); + } + + return schemasFromLiteral.get(schemaLiteral); + } + } catch (ExecutionException ex) { + throw new EventDeliveryException("Cannot get schema", ex.getCause()); + } catch (UncheckedExecutionException ex) { + throw new NonRecoverableEventException("Cannot parse schema", + ex.getCause()); + } + } + + public static class Builder implements EntityParser.Builder<GenericRecord> { + + @Override + public EntityParser<GenericRecord> build(Schema datasetSchema, Context config) { + return new AvroParser(datasetSchema); + } + + } +} http://git-wip-us.apache.org/repos/asf/flume/blob/914106c0/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/parser/EntityParser.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/parser/EntityParser.java b/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/parser/EntityParser.java new file mode 100644 index 0000000..f2051a2 --- /dev/null +++ b/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/parser/EntityParser.java @@ -0,0 +1,56 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flume.sink.kite.parser; + +import javax.annotation.concurrent.NotThreadSafe; +import org.apache.avro.Schema; +import org.apache.flume.Context; +import org.apache.flume.Event; +import org.apache.flume.EventDeliveryException; +import org.apache.flume.sink.kite.NonRecoverableEventException; + +@NotThreadSafe +public interface EntityParser<E> { + + /** + * Parse a Kite entity from a Flume event + * + * @param event The event to parse + * @param reuse If non-null, this may be reused and returned + * @return The parsed entity + * @throws EventDeliveryException A recoverable error during parsing. Parsing + * can be safely retried. + * @throws NonRecoverableEventException A non-recoverable error during + * parsing. The event must be discarded. + * + */ + public E parse(Event event, E reuse) throws EventDeliveryException, + NonRecoverableEventException; + + /** + * Knows how to build {@code EntityParser}s. Implementers must provide a + * no-arg constructor. + * + * @param <E> The type of entities generated + */ + public static interface Builder<E> { + + public EntityParser<E> build(Schema datasetSchema, Context config); + } +} http://git-wip-us.apache.org/repos/asf/flume/blob/914106c0/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/parser/EntityParserFactory.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/parser/EntityParserFactory.java b/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/parser/EntityParserFactory.java new file mode 100644 index 0000000..cfb7349 --- /dev/null +++ b/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/parser/EntityParserFactory.java @@ -0,0 +1,82 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flume.sink.kite.parser; + +import java.util.Arrays; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericRecord; +import org.apache.flume.Context; + +import static org.apache.flume.sink.kite.DatasetSinkConstants.*; + + +public class EntityParserFactory { + + public EntityParser<GenericRecord> newParser(Schema datasetSchema, Context config) { + EntityParser<GenericRecord> parser; + + String parserType = config.getString(CONFIG_ENTITY_PARSER, + DEFAULT_ENTITY_PARSER); + + if (parserType.equals(AVRO_ENTITY_PARSER)) { + parser = new AvroParser.Builder().build(datasetSchema, config); + } else { + + Class<? extends EntityParser.Builder> builderClass; + Class c; + try { + c = Class.forName(parserType); + } catch (ClassNotFoundException ex) { + throw new IllegalArgumentException("EntityParser.Builder class " + + parserType + " not found. Must set " + CONFIG_ENTITY_PARSER + + " to a class that implements EntityParser.Builder or to a builtin" + + " parser: " + Arrays.toString(AVAILABLE_PARSERS), ex); + } + + if (c != null && EntityParser.Builder.class.isAssignableFrom(c)) { + builderClass = c; + } else { + throw new IllegalArgumentException("Class " + parserType + " does not" + + " implement EntityParser.Builder. Must set " + + CONFIG_ENTITY_PARSER + " to a class that extends" + + " EntityParser.Builder or to a builtin parser: " + + Arrays.toString(AVAILABLE_PARSERS)); + } + + EntityParser.Builder<GenericRecord> builder; + try { + builder = builderClass.newInstance(); + } catch (InstantiationException ex) { + throw new IllegalArgumentException("Can't instantiate class " + + parserType + ". Must set " + CONFIG_ENTITY_PARSER + " to a class" + + " that extends EntityParser.Builder or to a builtin parser: " + + Arrays.toString(AVAILABLE_PARSERS), ex); + } catch (IllegalAccessException ex) { + throw new IllegalArgumentException("Can't instantiate class " + + parserType + ". Must set " + CONFIG_ENTITY_PARSER + " to a class" + + " that extends EntityParser.Builder or to a builtin parser: " + + Arrays.toString(AVAILABLE_PARSERS), ex); + } + + parser = builder.build(datasetSchema, config); + } + + return parser; + } +} http://git-wip-us.apache.org/repos/asf/flume/blob/914106c0/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/policy/FailurePolicy.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/policy/FailurePolicy.java b/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/policy/FailurePolicy.java new file mode 100644 index 0000000..47b6a25 --- /dev/null +++ b/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/policy/FailurePolicy.java @@ -0,0 +1,105 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flume.sink.kite.policy; + +import org.apache.flume.Context; +import org.apache.flume.Event; +import org.apache.flume.EventDeliveryException; +import org.apache.flume.sink.kite.DatasetSink; +import org.kitesdk.data.DatasetWriter; + +/** + * A policy for dealing with non-recoverable event delivery failures. + * + * Non-recoverable event delivery failures include: + * + * 1. Error parsing the event body thrown from the {@link EntityParser} + * 2. A schema mismatch between the schema of an event and the schema of the + * destination dataset. + * 3. A missing schema from the Event header when using the + * {@link AvroEntityParser}. + * + * The life cycle of a FailurePolicy mimics the life cycle of the + * {@link DatasetSink#writer}: + * + * 1. When a new writer is created, the policy will be instantiated. + * 2. As Event failures happen, + * {@link #handle(org.apache.flume.Event, java.lang.Throwable)} will be + * called to let the policy handle the failure. + * 3. If the {@link DatasetSink} is configured to commit on batch, then the + * {@link #sync()} method will be called when the batch is committed. + * 4. When the writer is closed, the policy's {@link #close()} method will be + * called. + */ +public interface FailurePolicy { + + /** + * Handle a non-recoverable event. + * + * @param event The event + * @param cause The cause of the failure + * @throws EventDeliveryException The policy failed to handle the event. When + * this is thrown, the Flume transaction will + * be rolled back and the event will be retried + * along with the rest of the batch. + */ + public void handle(Event event, Throwable cause) + throws EventDeliveryException; + + /** + * Ensure any handled events are on stable storage. + * + * This allows the policy implementation to sync any data that it may not + * have fully handled. + * + * See {@link DatasetWriter#sync()}. + * + * @throws EventDeliveryException The policy failed while syncing data. + * When this is thrown, the Flume transaction + * will be rolled back and the batch will be + * retried. + */ + public void sync() throws EventDeliveryException; + + /** + * Close this FailurePolicy and release any resources. + * + * @throws EventDeliveryException The policy failed while closing resources. + * When this is thrown, the Flume transaction + * will be rolled back and the batch will be + * retried. + */ + public void close() throws EventDeliveryException; + + /** + * Knows how to build {@code FailurePolicy}s. Implementers must provide a + * no-arg constructor. + */ + public static interface Builder { + + /** + * Build a new {@code FailurePolicy} + * + * @param config The Flume configuration context + * @return The {@code FailurePolicy} + */ + FailurePolicy build(Context config); + } + +} http://git-wip-us.apache.org/repos/asf/flume/blob/914106c0/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/policy/FailurePolicyFactory.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/policy/FailurePolicyFactory.java b/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/policy/FailurePolicyFactory.java new file mode 100644 index 0000000..a8b2008 --- /dev/null +++ b/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/policy/FailurePolicyFactory.java @@ -0,0 +1,82 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flume.sink.kite.policy; + +import java.util.Arrays; +import org.apache.flume.Context; + +import static org.apache.flume.sink.kite.DatasetSinkConstants.*; + + +public class FailurePolicyFactory { + + public FailurePolicy newPolicy(Context config) { + FailurePolicy policy; + + String policyType = config.getString(CONFIG_FAILURE_POLICY, + DEFAULT_FAILURE_POLICY); + + if (policyType.equals(RETRY_FAILURE_POLICY)) { + policy = new RetryPolicy.Builder().build(config); + } else if (policyType.equals(SAVE_FAILURE_POLICY)) { + policy = new SavePolicy.Builder().build(config); + } else { + + Class<? extends FailurePolicy.Builder> builderClass; + Class c; + try { + c = Class.forName(policyType); + } catch (ClassNotFoundException ex) { + throw new IllegalArgumentException("FailurePolicy.Builder class " + + policyType + " not found. Must set " + CONFIG_FAILURE_POLICY + + " to a class that implements FailurePolicy.Builder or to a builtin" + + " policy: " + Arrays.toString(AVAILABLE_POLICIES), ex); + } + + if (c != null && FailurePolicy.Builder.class.isAssignableFrom(c)) { + builderClass = c; + } else { + throw new IllegalArgumentException("Class " + policyType + " does not" + + " implement FailurePolicy.Builder. Must set " + + CONFIG_FAILURE_POLICY + " to a class that extends" + + " FailurePolicy.Builder or to a builtin policy: " + + Arrays.toString(AVAILABLE_POLICIES)); + } + + FailurePolicy.Builder builder; + try { + builder = builderClass.newInstance(); + } catch (InstantiationException ex) { + throw new IllegalArgumentException("Can't instantiate class " + + policyType + ". Must set " + CONFIG_FAILURE_POLICY + " to a class" + + " that extends FailurePolicy.Builder or to a builtin policy: " + + Arrays.toString(AVAILABLE_POLICIES), ex); + } catch (IllegalAccessException ex) { + throw new IllegalArgumentException("Can't instantiate class " + + policyType + ". Must set " + CONFIG_FAILURE_POLICY + " to a class" + + " that extends FailurePolicy.Builder or to a builtin policy: " + + Arrays.toString(AVAILABLE_POLICIES), ex); + } + + policy = builder.build(config); + } + + return policy; + } +} http://git-wip-us.apache.org/repos/asf/flume/blob/914106c0/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/policy/RetryPolicy.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/policy/RetryPolicy.java b/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/policy/RetryPolicy.java new file mode 100644 index 0000000..9a4991c --- /dev/null +++ b/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/policy/RetryPolicy.java @@ -0,0 +1,63 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flume.sink.kite.policy; + +import org.apache.flume.Context; +import org.apache.flume.Event; +import org.apache.flume.EventDeliveryException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A failure policy that logs the error and then forces a retry by throwing + * {@link EventDeliveryException}. + */ +public class RetryPolicy implements FailurePolicy { + private static final Logger LOG = LoggerFactory.getLogger(RetryPolicy.class); + + private RetryPolicy() { + } + + @Override + public void handle(Event event, Throwable cause) throws EventDeliveryException { + LOG.error("Event delivery failed: " + cause.getLocalizedMessage()); + LOG.debug("Exception follows.", cause); + + throw new EventDeliveryException(cause); + } + + @Override + public void sync() throws EventDeliveryException { + // do nothing + } + + @Override + public void close() throws EventDeliveryException { + // do nothing + } + + public static class Builder implements FailurePolicy.Builder { + + @Override + public FailurePolicy build(Context config) { + return new RetryPolicy(); + } + + } +} http://git-wip-us.apache.org/repos/asf/flume/blob/914106c0/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/policy/SavePolicy.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/policy/SavePolicy.java b/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/policy/SavePolicy.java new file mode 100644 index 0000000..ed47898 --- /dev/null +++ b/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/policy/SavePolicy.java @@ -0,0 +1,125 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flume.sink.kite.policy; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Maps; +import java.nio.ByteBuffer; +import java.util.Map; +import org.apache.flume.Context; +import org.apache.flume.Event; +import org.apache.flume.EventDeliveryException; +import org.apache.flume.source.avro.AvroFlumeEvent; +import org.kitesdk.data.DatasetDescriptor; +import org.kitesdk.data.DatasetWriter; +import org.kitesdk.data.Datasets; +import org.kitesdk.data.Formats; +import org.kitesdk.data.View; + +import static org.apache.flume.sink.kite.DatasetSinkConstants.*; + +/** + * A failure policy that writes the raw Flume event to a Kite dataset. + */ +public class SavePolicy implements FailurePolicy { + + private final View<AvroFlumeEvent> dataset; + private DatasetWriter<AvroFlumeEvent> writer; + private int nEventsHandled; + + private SavePolicy(Context context) { + String uri = context.getString(CONFIG_KITE_ERROR_DATASET_URI); + Preconditions.checkArgument(uri != null, "Must set " + + CONFIG_KITE_ERROR_DATASET_URI + " when " + CONFIG_FAILURE_POLICY + + "=save"); + if (Datasets.exists(uri)) { + dataset = Datasets.load(uri, AvroFlumeEvent.class); + } else { + DatasetDescriptor descriptor = new DatasetDescriptor.Builder() + .schema(AvroFlumeEvent.class) + .build(); + dataset = Datasets.create(uri, descriptor, AvroFlumeEvent.class); + } + + nEventsHandled = 0; + } + + @Override + public void handle(Event event, Throwable cause) throws EventDeliveryException { + try { + if (writer == null) { + writer = dataset.newWriter(); + } + + final AvroFlumeEvent avroEvent = new AvroFlumeEvent(); + avroEvent.setBody(ByteBuffer.wrap(event.getBody())); + avroEvent.setHeaders(toCharSeqMap(event.getHeaders())); + + writer.write(avroEvent); + nEventsHandled++; + } catch (RuntimeException ex) { + throw new EventDeliveryException(ex); + } + } + + @Override + public void sync() throws EventDeliveryException { + if (nEventsHandled > 0) { + if (Formats.PARQUET.equals( + dataset.getDataset().getDescriptor().getFormat())) { + // We need to close the writer on sync if we're writing to a Parquet + // dataset + close(); + } else { + writer.sync(); + } + } + } + + @Override + public void close() throws EventDeliveryException { + if (nEventsHandled > 0) { + try { + writer.close(); + } catch (RuntimeException ex) { + throw new EventDeliveryException(ex); + } finally { + writer = null; + nEventsHandled = 0; + } + } + } + + /** + * Helper function to convert a map of String to a map of CharSequence. + */ + private static Map<CharSequence, CharSequence> toCharSeqMap( + Map<String, String> map) { + return Maps.<CharSequence, CharSequence>newHashMap(map); + } + + public static class Builder implements FailurePolicy.Builder { + + @Override + public FailurePolicy build(Context config) { + return new SavePolicy(config); + } + + } +}
