Updated Branches:
  refs/heads/flume-1.4 6ba625c8e -> 2b66fb8d3

FLUME-2015. ElasticSearchSink: need access to IndexRequestBuilder instance 
during flume event processing

(Tim Bacon and Edward Sargisson via Mike Percy)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/2b66fb8d
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/2b66fb8d
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/2b66fb8d

Branch: refs/heads/flume-1.4
Commit: 2b66fb8d33d86bd7dabbb1b4f911c38c892dbdaa
Parents: 6ba625c
Author: Mike Percy <[email protected]>
Authored: Fri May 10 13:17:32 2013 -0700
Committer: Mike Percy <[email protected]>
Committed: Fri May 10 13:18:56 2013 -0700

----------------------------------------------------------------------
 flume-ng-doc/sphinx/FlumeUserGuide.rst             |   56 +++--
 ...actElasticSearchIndexRequestBuilderFactory.java |  158 ++++++++++++
 .../ElasticSearchEventSerializer.java              |    8 +-
 .../ElasticSearchIndexRequestBuilderFactory.java   |   58 +++++
 .../sink/elasticsearch/ElasticSearchSink.java      |   36 ++--
 .../EventSerializerIndexRequestBuilderFactory.java |   69 +++++
 .../AbstractElasticSearchSinkTest.java             |   19 ++-
 ...estElasticSearchIndexRequestBuilderFactory.java |  194 +++++++++++++++
 .../sink/elasticsearch/TestElasticSearchSink.java  |  107 ++++++++-
 9 files changed, 655 insertions(+), 50 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/2b66fb8d/flume-ng-doc/sphinx/FlumeUserGuide.rst
----------------------------------------------------------------------
diff --git a/flume-ng-doc/sphinx/FlumeUserGuide.rst 
b/flume-ng-doc/sphinx/FlumeUserGuide.rst
index d09a3f7..d129abf 100644
--- a/flume-ng-doc/sphinx/FlumeUserGuide.rst
+++ b/flume-ng-doc/sphinx/FlumeUserGuide.rst
@@ -1765,31 +1765,47 @@ Example for agent named a1:
   a1.sinks.k1.channel = c1
 
 ElasticSearchSink
-'''''''''''''''''
+~~~~~~~~~~~~~~~~~
+
+This sink writes data to an elasticsearch cluster. By default, events will be 
written so that the `Kibana <http://kibana.org>`_ graphical interface
+can display them - just as if `logstash <https://logstash.net>`_ wrote them. 
+
+The elasticsearch and lucene-core jars required for your environment must be 
placed in the lib directory of the Apache Flume installation. 
+Elasticsearch requires that the major version of the client JAR match that of 
the server and that both are running the same minor version
+of the JVM. SerializationExceptions will appear if this is incorrect. To 
+select the required version first determine the version of elasticsearch and 
the JVM version the target cluster is running. Then select an elasticsearch 
client
+library which matches the major version. A 0.19.x client can talk to a 0.19.x 
cluster; 0.20.x can talk to 0.20.x and 0.90.x can talk to 0.90.x. Once the
+elasticsearch version has been determined then read the pom.xml file to 
determine the correct lucene-core JAR version to use. The Flume agent
+which is running the ElasticSearchSink should also match the JVM the target 
cluster is running down to the minor version.
+
+Events will be written to a new index every day. The name will be 
<indexName>-yyyy-MM-dd where <indexName> is the indexName parameter. The sink
+will start writing to a new index at midnight UTC.
+
+Events are serialized for elasticsearch by the 
ElasticSearchLogStashEventSerializer by default. This behaviour can be
+overridden with the serializer parameter. This parameter accepts 
implementations of 
org.apache.flume.sink.elasticsearch.ElasticSearchEventSerializer
+or 
org.apache.flume.sink.elasticsearch.ElasticSearchIndexRequestBuilderFactory. 
Implementing ElasticSearchEventSerializer is deprecated in favour of
+the more powerful ElasticSearchIndexRequestBuilderFactory.
 
-This sink writes data to ElasticSearch. A class implementing
-ElasticSearchEventSerializer which is specified by the configuration is used 
to convert the events into
-XContentBuilder which detail the fields and mappings which will be indexed. 
These are then then written
-to ElasticSearch. The sink will generate an index per day allowing easier 
management instead of dealing with
-a single large index
 The type is the FQCN: org.apache.flume.sink.elasticsearch.ElasticSearchSink
+
 Required properties are in **bold**.
 
-================  
==================================================================  
=======================================================================================================
-Property Name     Default                                                      
       Description
-================  
==================================================================  
=======================================================================================================
+================  
======================================================================== 
=======================================================================================================
+Property Name     Default                                                      
            Description
+================  
======================================================================== 
=======================================================================================================
 **channel**       --
-**type**          --                                                           
       The component type name, needs to be ``elasticsearch``
-**hostNames**     --                                                           
       Comma separated list of hostname:port, if the port is not present the 
default port '9300' will be used
-indexName         flume                                                        
       The name of the index which the date will be appended to. Example 
'flume' -> 'flume-yyyy-MM-dd'
-indexType         logs                                                         
       The type to index the document to, defaults to 'log'
-clusterName       elasticsearch                                                
       Name of the ElasticSearch cluster to connect to
-batchSize         100                                                          
       Number of events to be written per txn.
-ttl               --                                                           
       TTL in days, when set will cause the expired documents to be deleted 
automatically,
-                                                                               
       if not set documents will never be automatically deleted
-serializer        
org.apache.flume.sink.elasticsearch.ElasticSearchDynamicSerializer
-serializer.*      --                                                           
       Properties to be passed to the serializer.
-================  
==================================================================  
=======================================================================================================
+**type**          --                                                           
            The component type name, needs to be 
``org.apache.flume.sink.elasticsearch.ElasticSearchSink``
+**hostNames**     --                                                           
            Comma separated list of hostname:port, if the port is not present 
the default port '9300' will be used
+indexName         flume                                                        
            The name of the index which the date will be appended to. Example 
'flume' -> 'flume-yyyy-MM-dd'
+indexType         logs                                                         
            The type to index the document to, defaults to 'log'
+clusterName       elasticsearch                                                
            Name of the ElasticSearch cluster to connect to
+batchSize         100                                                          
            Number of events to be written per txn.
+ttl               --                                                           
            TTL in days, when set will cause the expired documents to be 
deleted automatically,
+                                                                               
            if not set documents will never be automatically deleted
+serializer        
org.apache.flume.sink.elasticsearch.ElasticSearchLogStashEventSerializer The 
ElasticSearchIndexRequestBuilderFactory or ElasticSearchEventSerializer to use. 
Implementations of
+                                                                               
            either class are accepted but 
ElasticSearchIndexRequestBuilderFactory is preferred.
+serializer.*      --                                                           
            Properties to be passed to the serializer.
+================  
======================================================================== 
=======================================================================================================
 
 Example for agent named a1:
 

http://git-wip-us.apache.org/repos/asf/flume/blob/2b66fb8d/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/AbstractElasticSearchIndexRequestBuilderFactory.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/AbstractElasticSearchIndexRequestBuilderFactory.java
 
b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/AbstractElasticSearchIndexRequestBuilderFactory.java
new file mode 100644
index 0000000..6effe34
--- /dev/null
+++ 
b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/AbstractElasticSearchIndexRequestBuilderFactory.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.sink.elasticsearch;
+
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.lang.time.FastDateFormat;
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.conf.ComponentConfiguration;
+import org.apache.flume.conf.Configurable;
+import org.apache.flume.conf.ConfigurableComponent;
+import org.apache.flume.event.SimpleEvent;
+import org.elasticsearch.action.index.IndexRequestBuilder;
+import org.elasticsearch.client.Client;
+import org.joda.time.DateTimeUtils;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Maps;
+
+/**
+ * Abstract base class for custom implementations of
+ * {@link ElasticSearchIndexRequestBuilderFactory}.
+ */
+public abstract class AbstractElasticSearchIndexRequestBuilderFactory
+    implements ElasticSearchIndexRequestBuilderFactory {
+
+  /**
+   * {@link FastDateFormat} to use for index names
+   *   in {@link #getIndexName(String, long)}
+   */
+  protected final FastDateFormat fastDateFormat;
+
+  /**
+   * Constructor for subclasses
+   * @param fastDateFormat {@link FastDateFormat} to use for index names
+   */
+  protected AbstractElasticSearchIndexRequestBuilderFactory(
+    FastDateFormat fastDateFormat) {
+    this.fastDateFormat = fastDateFormat;
+  }
+
+  /**
+   * @see Configurable
+   */
+  @Override
+  public abstract void configure(Context arg0);
+
+  /**
+   * @see ConfigurableComponent
+   */
+  @Override
+  public abstract void configure(ComponentConfiguration arg0);
+
+  /**
+   * Creates and prepares an {@link IndexRequestBuilder} from the supplied
+   * {@link Client} via delegation to the subclass-hook template methods
+   * {@link #getIndexName(String, long)} and
+   * {@link #prepareIndexRequest(IndexRequestBuilder, String, String, Event)}
+   */
+  @Override
+  public IndexRequestBuilder createIndexRequest(Client client,
+        String indexPrefix, String indexType, Event event) throws IOException {
+    IndexRequestBuilder request = prepareIndex(client);
+    TimestampedEvent timestampedEvent = new TimestampedEvent(event);
+    long timestamp = timestampedEvent.getTimestamp();
+    String indexName = getIndexName(indexPrefix, timestamp);
+    prepareIndexRequest(request, indexName, indexType, timestampedEvent);
+    return request;
+  }
+
+  @VisibleForTesting
+  IndexRequestBuilder prepareIndex(Client client) {
+    return client.prepareIndex();
+  }
+
+  /**
+   * Gets the name of the index to use for an index request
+   * @return index name of the form 'indexPrefix-formattedTimestamp'
+   * @param indexPrefix
+   *          Prefix of index name to use -- as configured on the sink
+   * @param timestamp
+   *          timestamp (millis) to format / use
+   */
+  protected String getIndexName(String indexPrefix, long timestamp) {
+    return new StringBuilder(indexPrefix).append('-')
+      .append(fastDateFormat.format(timestamp)).toString();
+  }
+
+  /**
+   * Prepares an ElasticSearch {@link IndexRequestBuilder} instance
+   * @param indexRequest
+   *          The (empty) ElasticSearch {@link IndexRequestBuilder} to prepare
+   * @param indexName
+   *          Index name to use -- as per {@link #getIndexName(String, long)}
+   * @param indexType
+   *          Index type to use -- as configured on the sink
+   * @param event
+   *          Flume event to serialize and add to index request
+   * @throws IOException
+   *           If an error occurs e.g. during serialization
+  */
+  protected abstract void prepareIndexRequest(
+      IndexRequestBuilder indexRequest, String indexName,
+      String indexType, Event event) throws IOException;
+
+}
+
+/**
+ * {@link Event} implementation that has a timestamp.
+ * The timestamp is taken from (in order of precedence):<ol>
+ * <li>The "timestamp" header of the base event, if present</li>
+ * <li>The "@timestamp" header of the base event, if present</li>
+ * <li>The current time in millis, otherwise</li>
+ * </ol>
+ */
+final class TimestampedEvent extends SimpleEvent {
+
+    private final long timestamp;
+
+    TimestampedEvent(Event base) {
+      setBody(base.getBody());
+      Map<String, String> headers = Maps.newHashMap(base.getHeaders());
+      String timestampString = headers.get("timestamp");
+      if (StringUtils.isBlank(timestampString)) {
+        timestampString = headers.get("@timestamp");
+      }
+      if (StringUtils.isBlank(timestampString)) {
+        this.timestamp = DateTimeUtils.currentTimeMillis();
+        headers.put("timestamp", String.valueOf(timestamp ));
+      } else {
+        this.timestamp = Long.valueOf(timestampString);
+      }
+      setHeaders(headers);
+    }
+
+    long getTimestamp() {
+        return timestamp;
+    }
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/2b66fb8d/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ElasticSearchEventSerializer.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ElasticSearchEventSerializer.java
 
b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ElasticSearchEventSerializer.java
index dc6a093..c89d627 100644
--- 
a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ElasticSearchEventSerializer.java
+++ 
b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ElasticSearchEventSerializer.java
@@ -24,7 +24,7 @@ import java.nio.charset.Charset;
 import org.apache.flume.Event;
 import org.apache.flume.conf.Configurable;
 import org.apache.flume.conf.ConfigurableComponent;
-import org.elasticsearch.common.xcontent.XContentBuilder;
+import org.elasticsearch.common.io.BytesStream;
 
 /**
  * Interface for an event serializer which serializes the headers and body of 
an
@@ -37,12 +37,12 @@ public interface ElasticSearchEventSerializer extends 
Configurable,
   public static final Charset charset = Charset.defaultCharset();
 
   /**
-   * Return an {@link XContentBuilder} made up of the serialized flume event
+   * Return an {@link BytesStream} made up of the serialized flume event
    * @param event
    *          The flume event to serialize
-   * @return A {@link XContentBuilder} used to write to ElasticSearch
+   * @return A {@link BytesStream} used to write to ElasticSearch
    * @throws IOException
    *           If an error occurs during serialization
    */
-  abstract XContentBuilder getContentBuilder(Event event) throws IOException;
+  abstract BytesStream getContentBuilder(Event event) throws IOException;
 }

http://git-wip-us.apache.org/repos/asf/flume/blob/2b66fb8d/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ElasticSearchIndexRequestBuilderFactory.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ElasticSearchIndexRequestBuilderFactory.java
 
b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ElasticSearchIndexRequestBuilderFactory.java
new file mode 100644
index 0000000..8e77a1e
--- /dev/null
+++ 
b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ElasticSearchIndexRequestBuilderFactory.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.sink.elasticsearch;
+
+import java.io.IOException;
+import java.util.TimeZone;
+
+import org.apache.commons.lang.time.FastDateFormat;
+import org.apache.flume.Event;
+import org.apache.flume.conf.Configurable;
+import org.apache.flume.conf.ConfigurableComponent;
+import org.elasticsearch.action.index.IndexRequestBuilder;
+import org.elasticsearch.client.Client;
+
+/**
+ * Interface for creating ElasticSearch {@link IndexRequestBuilder}
+ * instances from serialized flume events. This is configurable, so any config
+ * params required should be taken through this.
+ */
+public interface ElasticSearchIndexRequestBuilderFactory extends Configurable,
+    ConfigurableComponent {
+
+  static final FastDateFormat df = FastDateFormat.getInstance("yyyy-MM-dd",
+      TimeZone.getTimeZone("Etc/UTC"));
+
+  /**
+   * @return prepared ElasticSearch {@link IndexRequestBuilder} instance
+   * @param client
+   *          ElasticSearch {@link Client} to prepare index from
+ * @param indexPrefix
+   *          Prefix of index name to use -- as configured on the sink
+ * @param indexType
+   *          Index type to use -- as configured on the sink
+ * @param event
+   *          Flume event to serialize and add to index request
+   * @throws IOException
+   *           If an error occurs e.g. during serialization
+  */
+  IndexRequestBuilder createIndexRequest(Client client,
+      String indexPrefix, String indexType, Event event) throws IOException;
+
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/2b66fb8d/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ElasticSearchSink.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ElasticSearchSink.java
 
b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ElasticSearchSink.java
index 1b3db14..3286412 100644
--- 
a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ElasticSearchSink.java
+++ 
b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ElasticSearchSink.java
@@ -33,11 +33,9 @@ import static 
org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.SER
 import static 
org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.TTL;
 
 import java.util.Arrays;
-import java.util.Date;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.commons.lang.StringUtils;
-import org.apache.commons.lang.time.FastDateFormat;
 import org.apache.flume.Channel;
 import org.apache.flume.Context;
 import org.apache.flume.CounterGroup;
@@ -55,7 +53,6 @@ import org.elasticsearch.client.transport.TransportClient;
 import org.elasticsearch.common.settings.ImmutableSettings;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.transport.InetSocketTransportAddress;
-import org.elasticsearch.common.xcontent.XContentBuilder;
 import org.elasticsearch.node.Node;
 import org.elasticsearch.node.NodeBuilder;
 import org.slf4j.Logger;
@@ -90,8 +87,6 @@ public class ElasticSearchSink extends AbstractSink 
implements Configurable {
   private static final Logger logger = LoggerFactory
       .getLogger(ElasticSearchSink.class);
 
-  static final FastDateFormat df = FastDateFormat.getInstance("yyyy-MM-dd");
-
   // Used for testing
   private boolean isLocal = false;
   private final CounterGroup counterGroup = new CounterGroup();
@@ -108,7 +103,7 @@ public class ElasticSearchSink extends AbstractSink 
implements Configurable {
 
   private Node node;
   private Client client;
-  private ElasticSearchEventSerializer serializer;
+  private ElasticSearchIndexRequestBuilderFactory indexRequestFactory;
   private SinkCounter sinkCounter;
 
   /**
@@ -145,7 +140,7 @@ public class ElasticSearchSink extends AbstractSink 
implements Configurable {
 
   @VisibleForTesting
   String getIndexName() {
-    return indexName + "-" + df.format(new Date());
+    return indexName;
   }
 
   @VisibleForTesting
@@ -166,7 +161,6 @@ public class ElasticSearchSink extends AbstractSink 
implements Configurable {
     Transaction txn = channel.getTransaction();
     try {
       txn.begin();
-      String indexName = getIndexName();
       BulkRequestBuilder bulkRequest = client.prepareBulk();
       for (int i = 0; i < batchSize; i++) {
         Event event = channel.take();
@@ -175,15 +169,15 @@ public class ElasticSearchSink extends AbstractSink 
implements Configurable {
           break;
         }
 
-        XContentBuilder builder = serializer.getContentBuilder(event);
-        IndexRequestBuilder request = client.prepareIndex(indexName, indexType)
-            .setSource(builder);
+        IndexRequestBuilder indexRequest =
+            indexRequestFactory.createIndexRequest(
+                client, indexName, indexType, event);
 
         if (ttlMs > 0) {
-          request.setTTL(ttlMs);
+          indexRequest.setTTL(ttlMs);
         }
 
-        bulkRequest.add(request);
+        bulkRequest.add(indexRequest);
       }
 
       int size = bulkRequest.numberOfActions();
@@ -291,10 +285,20 @@ public class ElasticSearchSink extends AbstractSink 
implements Configurable {
 
     try {
       @SuppressWarnings("unchecked")
-      Class<? extends ElasticSearchEventSerializer> clazz = (Class<? extends 
ElasticSearchEventSerializer>) Class
+      Class<? extends Configurable> clazz = (Class<? extends Configurable>) 
Class
           .forName(serializerClazz);
-      serializer = clazz.newInstance();
-      serializer.configure(serializerContext);
+      Configurable serializer = clazz.newInstance();
+      if (serializer instanceof ElasticSearchIndexRequestBuilderFactory) {
+        indexRequestFactory = (ElasticSearchIndexRequestBuilderFactory) 
serializer;
+      } else if (serializer instanceof ElasticSearchEventSerializer){
+        indexRequestFactory = new EventSerializerIndexRequestBuilderFactory(
+            (ElasticSearchEventSerializer) serializer);
+      } else {
+          throw new IllegalArgumentException(
+              serializerClazz + " is neither an ElasticSearchEventSerializer"
+              + " nor an ElasticSearchIndexRequestBuilderFactory.");
+      }
+      indexRequestFactory.configure(serializerContext);
     } catch (Exception e) {
       logger.error("Could not instantiate event serializer.", e);
       Throwables.propagate(e);

http://git-wip-us.apache.org/repos/asf/flume/blob/2b66fb8d/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/EventSerializerIndexRequestBuilderFactory.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/EventSerializerIndexRequestBuilderFactory.java
 
b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/EventSerializerIndexRequestBuilderFactory.java
new file mode 100644
index 0000000..c71b2e5
--- /dev/null
+++ 
b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/EventSerializerIndexRequestBuilderFactory.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.sink.elasticsearch;
+
+import java.io.IOException;
+
+import org.apache.commons.lang.time.FastDateFormat;
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.conf.ComponentConfiguration;
+import org.elasticsearch.action.index.IndexRequestBuilder;
+import org.elasticsearch.common.io.BytesStream;
+
+/**
+ * Default implementation of {@link ElasticSearchIndexRequestBuilderFactory}.
+ * It serializes flume events using the
+ * {@link ElasticSearchEventSerializer} instance configured on the sink.
+ */
+public class EventSerializerIndexRequestBuilderFactory
+  extends AbstractElasticSearchIndexRequestBuilderFactory {
+
+  protected final ElasticSearchEventSerializer serializer;
+
+  public EventSerializerIndexRequestBuilderFactory(
+      ElasticSearchEventSerializer serializer) {
+    this(serializer, ElasticSearchIndexRequestBuilderFactory.df);
+  }
+
+  protected EventSerializerIndexRequestBuilderFactory(
+      ElasticSearchEventSerializer serializer, FastDateFormat fdf) {
+    super(fdf);
+    this.serializer = serializer;
+  }
+
+  @Override
+  public void configure(Context context) {
+    serializer.configure(context);
+  }
+
+  @Override
+  public void configure(ComponentConfiguration config) {
+    serializer.configure(config);
+  }
+
+  @Override
+  protected void prepareIndexRequest(IndexRequestBuilder indexRequest,
+      String indexName, String indexType, Event event) throws IOException {
+    BytesStream contentBuilder = serializer.getContentBuilder(event);
+    indexRequest.setIndex(indexName)
+        .setType(indexType)
+        .setSource(contentBuilder.bytes());
+  }
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/2b66fb8d/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/AbstractElasticSearchSinkTest.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/AbstractElasticSearchSinkTest.java
 
b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/AbstractElasticSearchSinkTest.java
index 2edacdc..ecbdd99 100644
--- 
a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/AbstractElasticSearchSinkTest.java
+++ 
b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/AbstractElasticSearchSinkTest.java
@@ -27,7 +27,6 @@ import static org.junit.Assert.assertEquals;
 
 import java.util.Arrays;
 import java.util.Comparator;
-import java.util.Date;
 import java.util.Map;
 
 import org.apache.flume.Channel;
@@ -48,12 +47,16 @@ import org.elasticsearch.node.NodeBuilder;
 import org.elasticsearch.node.internal.InternalNode;
 import org.elasticsearch.search.SearchHit;
 import org.elasticsearch.search.SearchHits;
+import org.joda.time.DateTimeUtils;
+import org.junit.After;
+import org.junit.Before;
 
 public abstract class AbstractElasticSearchSinkTest {
 
   static final String DEFAULT_INDEX_NAME = "flume";
   static final String DEFAULT_INDEX_TYPE = "log";
   static final String DEFAULT_CLUSTER_NAME = "elasticsearch";
+  static final long FIXED_TIME_MILLIS = 123456789L;
 
   Node node;
   Client client;
@@ -68,8 +71,8 @@ public abstract class AbstractElasticSearchSinkTest {
     parameters.put(BATCH_SIZE, "1");
     parameters.put(TTL, "5");
 
-    timestampedIndexName = DEFAULT_INDEX_NAME + "-"
-        + ElasticSearchSink.df.format(new Date());
+    timestampedIndexName = DEFAULT_INDEX_NAME + '-'
+        + ElasticSearchIndexRequestBuilderFactory.df.format(FIXED_TIME_MILLIS);
   }
 
   void createNodes() throws Exception {
@@ -94,6 +97,16 @@ public abstract class AbstractElasticSearchSinkTest {
     node.close();
   }
 
+  @Before
+  public void setFixedJodaTime() {
+    DateTimeUtils.setCurrentMillisFixed(FIXED_TIME_MILLIS);
+  }
+
+  @After
+  public void resetJodaTime() {
+    DateTimeUtils.setCurrentMillisSystem();
+  }
+
   Channel bindAndStartChannel(ElasticSearchSink fixture) {
     // Configure the channel
     Channel channel = new MemoryChannel();

http://git-wip-us.apache.org/repos/asf/flume/blob/2b66fb8d/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TestElasticSearchIndexRequestBuilderFactory.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TestElasticSearchIndexRequestBuilderFactory.java
 
b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TestElasticSearchIndexRequestBuilderFactory.java
new file mode 100644
index 0000000..1e4e119
--- /dev/null
+++ 
b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TestElasticSearchIndexRequestBuilderFactory.java
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.sink.elasticsearch;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.conf.ComponentConfiguration;
+import org.apache.flume.conf.sink.SinkConfiguration;
+import org.apache.flume.event.SimpleEvent;
+import org.elasticsearch.action.index.IndexRequestBuilder;
+import org.elasticsearch.client.Client;
+import org.elasticsearch.common.io.BytesStream;
+import org.elasticsearch.common.io.FastByteArrayOutputStream;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.google.common.collect.Maps;
+
+public class TestElasticSearchIndexRequestBuilderFactory
+    extends AbstractElasticSearchSinkTest {
+
+  private static final Client FAKE_CLIENT = null;
+
+  private EventSerializerIndexRequestBuilderFactory factory;
+
+  private FakeEventSerializer serializer;
+
+  @Before
+  public void setupFactory() throws Exception {
+    serializer = new FakeEventSerializer();
+    factory = new EventSerializerIndexRequestBuilderFactory(serializer) {
+      @Override
+      IndexRequestBuilder prepareIndex(Client client) {
+        return new IndexRequestBuilder(FAKE_CLIENT);
+      }
+    };
+  }
+
+  @Test
+  public void shouldUseUtcAsBasisForDateFormat() {
+    assertEquals("Coordinated Universal Time",
+        factory.fastDateFormat.getTimeZone().getDisplayName());
+  }
+
+  @Test
+  public void indexNameShouldBePrefixDashFormattedTimestamp() {
+    long millis = 987654321L;
+    assertEquals("prefix-"+factory.fastDateFormat.format(millis),
+        factory.getIndexName("prefix", millis));
+  }
+
+  @Test
+  public void shouldEnsureTimestampHeaderPresentInTimestampedEvent() {
+    SimpleEvent base = new SimpleEvent();
+
+    TimestampedEvent timestampedEvent = new TimestampedEvent(base);
+    assertEquals(FIXED_TIME_MILLIS, timestampedEvent.getTimestamp());
+    assertEquals(String.valueOf(FIXED_TIME_MILLIS),
+        timestampedEvent.getHeaders().get("timestamp"));
+  }
+
+  @Test
+  public void shouldUseExistingTimestampHeaderInTimestampedEvent() {
+    SimpleEvent base = new SimpleEvent();
+    Map<String, String> headersWithTimestamp = Maps.newHashMap();
+    headersWithTimestamp.put("timestamp", "-321");
+    base.setHeaders(headersWithTimestamp );
+
+    TimestampedEvent timestampedEvent = new TimestampedEvent(base);
+    assertEquals(-321L, timestampedEvent.getTimestamp());
+    assertEquals("-321", timestampedEvent.getHeaders().get("timestamp"));
+  }
+
+  @Test
+  public void shouldUseExistingAtTimestampHeaderInTimestampedEvent() {
+    SimpleEvent base = new SimpleEvent();
+    Map<String, String> headersWithTimestamp = Maps.newHashMap();
+    headersWithTimestamp.put("@timestamp", "-999");
+    base.setHeaders(headersWithTimestamp );
+
+    TimestampedEvent timestampedEvent = new TimestampedEvent(base);
+    assertEquals(-999L, timestampedEvent.getTimestamp());
+    assertEquals("-999", timestampedEvent.getHeaders().get("@timestamp"));
+    assertNull(timestampedEvent.getHeaders().get("timestamp"));
+  }
+
+  @Test
+  public void shouldPreserveBodyAndNonTimestampHeadersInTimestampedEvent() {
+    SimpleEvent base = new SimpleEvent();
+    base.setBody(new byte[] {1,2,3,4});
+    Map<String, String> headersWithTimestamp = Maps.newHashMap();
+    headersWithTimestamp.put("foo", "bar");
+    base.setHeaders(headersWithTimestamp );
+
+    TimestampedEvent timestampedEvent = new TimestampedEvent(base);
+    assertEquals("bar", timestampedEvent.getHeaders().get("foo"));
+    assertArrayEquals(base.getBody(), timestampedEvent.getBody());
+  }
+
+  @Test
+  public void shouldSetIndexNameTypeAndSerializedEventIntoIndexRequest()
+      throws Exception {
+
+    String indexPrefix = "qwerty";
+    String indexType = "uiop";
+    Event event = new SimpleEvent();
+
+    IndexRequestBuilder indexRequestBuilder = factory.createIndexRequest(
+        FAKE_CLIENT, indexPrefix, indexType, event);
+
+    assertEquals(indexPrefix + '-'
+        + ElasticSearchIndexRequestBuilderFactory.df.format(FIXED_TIME_MILLIS),
+      indexRequestBuilder.request().index());
+    assertEquals(indexType, indexRequestBuilder.request().type());
+    assertArrayEquals(FakeEventSerializer.FAKE_BYTES,
+        indexRequestBuilder.request().source().array());
+  }
+
+  @Test
+  public void shouldSetIndexNameFromTimestampHeaderWhenPresent()
+      throws Exception {
+    String indexPrefix = "qwerty";
+    String indexType = "uiop";
+    Event event = new SimpleEvent();
+    event.getHeaders().put("timestamp", "1213141516");
+
+    IndexRequestBuilder indexRequestBuilder = factory.createIndexRequest(
+        null, indexPrefix, indexType, event);
+
+    assertEquals(indexPrefix + '-'
+        + ElasticSearchIndexRequestBuilderFactory.df.format(1213141516L),
+      indexRequestBuilder.request().index());
+  }
+
+  @Test
+  public void shouldConfigureEventSerializer() throws Exception {
+    assertFalse(serializer.configuredWithContext);
+    factory.configure(new Context());
+    assertTrue(serializer.configuredWithContext);
+
+    assertFalse(serializer.configuredWithComponentConfiguration);
+    factory.configure(new SinkConfiguration("name"));
+    assertTrue(serializer.configuredWithComponentConfiguration);
+  }
+
+}
+
+class FakeEventSerializer implements ElasticSearchEventSerializer {
+
+  static final byte[] FAKE_BYTES = new byte[] {9,8,7,6};
+  boolean configuredWithContext, configuredWithComponentConfiguration;
+
+  @Override
+  public BytesStream getContentBuilder(Event event) throws IOException {
+    FastByteArrayOutputStream fbaos = new FastByteArrayOutputStream(4);
+    fbaos.write(FAKE_BYTES);
+    return fbaos;
+  }
+
+  @Override
+  public void configure(Context arg0) {
+    configuredWithContext = true;
+  }
+
+  @Override
+  public void configure(ComponentConfiguration arg0) {
+    configuredWithComponentConfiguration = true;
+  }
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/2b66fb8d/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TestElasticSearchSink.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TestElasticSearchSink.java
 
b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TestElasticSearchSink.java
index 94b95b1..ad40a3c 100644
--- 
a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TestElasticSearchSink.java
+++ 
b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TestElasticSearchSink.java
@@ -24,20 +24,27 @@ import static 
org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.DEF
 import static 
org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.HOSTNAMES;
 import static 
org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.INDEX_NAME;
 import static 
org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.INDEX_TYPE;
+import static 
org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.SERIALIZER;
 import static 
org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.TTL;
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 
-import java.util.Date;
+import java.io.IOException;
+import java.util.TimeZone;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.commons.lang.time.FastDateFormat;
 import org.apache.flume.Channel;
 import org.apache.flume.Context;
 import org.apache.flume.Event;
 import org.apache.flume.Sink.Status;
 import org.apache.flume.Transaction;
+import org.apache.flume.conf.ComponentConfiguration;
+import org.apache.flume.conf.Configurable;
 import org.apache.flume.conf.Configurables;
 import org.apache.flume.event.EventBuilder;
+import org.elasticsearch.action.index.IndexRequestBuilder;
 import org.elasticsearch.client.Requests;
 import org.elasticsearch.common.UUID;
 import org.elasticsearch.common.transport.InetSocketTransportAddress;
@@ -112,6 +119,7 @@ public class TestElasticSearchSink extends 
AbstractElasticSearchSinkTest {
     assertMatchAllQuery(numberOfEvents, events);
     assertBodyQuery(5, events);
   }
+
   @Test
   public void shouldIndexFiveEventsOverThreeBatches() throws Exception {
     parameters.put(BATCH_SIZE, "2");
@@ -163,9 +171,7 @@ public class TestElasticSearchSink extends 
AbstractElasticSearchSinkTest {
         "10.5.5.27", DEFAULT_PORT) };
 
     assertEquals("testing-cluster-name", fixture.getClusterName());
-    assertEquals(
-        "testing-index-name-" + ElasticSearchSink.df.format(new Date()),
-        fixture.getIndexName());
+    assertEquals("testing-index-name", fixture.getIndexName());
     assertEquals("testing-index-type", fixture.getIndexType());
     assertEquals(TimeUnit.DAYS.toMillis(10), fixture.getTTLMs());
     assertArrayEquals(expected, fixture.getServerAddresses());
@@ -184,9 +190,7 @@ public class TestElasticSearchSink extends 
AbstractElasticSearchSinkTest {
     InetSocketTransportAddress[] expected = { new InetSocketTransportAddress(
         "10.5.5.27", DEFAULT_PORT) };
 
-    assertEquals(
-        DEFAULT_INDEX_NAME + "-" + ElasticSearchSink.df.format(new Date()),
-        fixture.getIndexName());
+    assertEquals(DEFAULT_INDEX_NAME, fixture.getIndexName());
     assertEquals(DEFAULT_INDEX_TYPE, fixture.getIndexType());
     assertEquals(DEFAULT_CLUSTER_NAME, fixture.getClusterName());
     assertArrayEquals(expected, fixture.getServerAddresses());
@@ -221,4 +225,93 @@ public class TestElasticSearchSink extends 
AbstractElasticSearchSinkTest {
 
     assertArrayEquals(expected, fixture.getServerAddresses());
   }
+
+  @Test
+  public void shouldAllowCustomElasticSearchIndexRequestBuilderFactory()
+      throws Exception {
+
+    parameters.put(SERIALIZER,
+        CustomElasticSearchIndexRequestBuilderFactory.class.getName());
+
+    Configurables.configure(fixture, new Context(parameters));
+
+    Channel channel = bindAndStartChannel(fixture);
+    Transaction tx = channel.getTransaction();
+    tx.begin();
+    String body = "{ foo: \"bar\" }";
+    Event event = EventBuilder.withBody(body.getBytes());
+    channel.put(event);
+    tx.commit();
+    tx.close();
+
+    fixture.process();
+    fixture.stop();
+
+    assertEquals(fixture.getIndexName()+"-05_17_36_789",
+        CustomElasticSearchIndexRequestBuilderFactory.actualIndexName);
+    assertEquals(fixture.getIndexType(),
+        CustomElasticSearchIndexRequestBuilderFactory.actualIndexType);
+    assertArrayEquals(event.getBody(),
+        CustomElasticSearchIndexRequestBuilderFactory.actualEventBody);
+    assertTrue(CustomElasticSearchIndexRequestBuilderFactory.hasContext);
+  }
+
+  public static final class CustomElasticSearchIndexRequestBuilderFactory
+      extends AbstractElasticSearchIndexRequestBuilderFactory {
+
+    static String actualIndexName, actualIndexType;
+    static byte[] actualEventBody;
+    static boolean hasContext;
+
+    public CustomElasticSearchIndexRequestBuilderFactory() {
+      super(FastDateFormat.getInstance("HH_mm_ss_SSS",
+          TimeZone.getTimeZone("EST5EDT")));
+    }
+
+    @Override
+    protected void prepareIndexRequest(IndexRequestBuilder indexRequest,
+        String indexName, String indexType, Event event) throws IOException {
+      actualIndexName = indexName;
+      actualIndexType = indexType;
+      actualEventBody = event.getBody();
+      indexRequest.setIndex(indexName).setType(indexType)
+          .setSource(event.getBody());
+    }
+
+    @Override
+    public void configure(Context arg0) {
+      hasContext = true;
+    }
+
+    @Override
+    public void configure(ComponentConfiguration arg0) {
+      //no-op
+    }
+  }
+
+  @Test
+  public void shouldFailToConfigureWithInvalidSerializerClass()
+      throws Exception {
+
+    parameters.put(SERIALIZER, "java.lang.String");
+    try {
+      Configurables.configure(fixture, new Context(parameters));
+    } catch (ClassCastException e) {
+      // expected
+    }
+
+    parameters.put(SERIALIZER, FakeConfigurable.class.getName());
+    try {
+      Configurables.configure(fixture, new Context(parameters));
+    } catch (IllegalArgumentException e) {
+      // expected
+    }
+  }
+
+  public static class FakeConfigurable implements Configurable {
+    @Override
+    public void configure(Context arg0) {
+        // no-op
+    }
+  }
 }

Reply via email to