rionmonster opened a new pull request #17061:
URL: https://github.com/apache/flink/pull/17061


   ## What is the purpose of the change
   
   Currently, the existing `ElasticsearchSink` implementation supports dynamic, 
index-based routing at runtime however any additional routing to separate ES 
clusters would require the sinks to be known when the job was started up, which 
may not always be the available.
   
   This pull request implements a `DynamicElasticsearchSink` that supports an 
`ElasticsearchSinkRouter` interface which handles resolving a unique route and 
sink from an incoming element, which in turn **allows elements to not only be 
routed to dynamic indices but also dynamic Elasticsearch sinks at runtime**. 
   
   The sink itself functions as a wrapper with an in-memory cache that contains 
the route (i.e. key) and its corresponding sink. When a previously unseen route 
is encountered, a new sink will be created and cached, otherwise the original 
sink for that route will be used.
   
   ### Example Usage
   
   An example usage might look something like this:
   
   ```
   env
       .fromElements(
               // Define a series of Tuples where the first element represents 
the ES
               // host being targeted and the second is the message payload
               Tuple2.of(HttpHost.create(".../sink-a"), "message-a-1"),
               Tuple2.of(HttpHost.create(".../sink-b"), "message-b-1"),
               Tuple2.of(HttpHost.create(".../sink-a"), "message-a-2")
       )
       .addSink(
           new DynamicElasticsearchSink<>(
               new ElasticsearchSinkRouter<
                       // Element-type
                       Tuple2<HttpHost, String>,
                       // Route-type (used to seed the hashmap / cache sinks)
                       String,
                       // Sink implementation for this route
                       ElasticsearchSink<Tuple2<HttpHost, String>>>() {
   
                   @Override
                   public String getRoute(Tuple2<HttpHost, String> element) {
                       // Construct a deterministic unique caching key to 
associate
                       // a given "route" to a corresponding sink
                       return element.f0.toHostString();
                   }
   
                   @Override
                   public ElasticsearchSink<Tuple2<HttpHost, String>> 
createSink(
                           String cacheKey, Tuple2<HttpHost, String> element) {
   
                       // Construct a new sink based on the configuration 
information for
                       // this element
                       ElasticsearchSink.Builder<Tuple2<HttpHost, String>> 
builder =
                               ElasticsearchSink.Builder(
                                   List.of(element.f0),
                                   (ElasticsearchSinkFunction<Tuple2<HttpHost, 
String>>)
                                       (el, ctx, indexer) -> {
                                           // Construct index request.
                                           indexer.add(...);
                                       });
   
                       builder.setRestClientFactory(restClientBuilder -> {
                           // Apply any authentication here if that information 
was available
                           // within your routing / element
                       });
   
                       return builder.build();
                   }
               }
           )
       );
   ```
   
   This example takes a series of `Tuple2<HttpHost, String>` elements (where 
the HttpHost represents the host of a given Elasticsearch cluster and the 
String is the data), these are passed to an `ElasticsearchSinkRouter` instance 
that can do the following given an instance:
   
   - Resolve the route identifier `RouteT` that is used to associate a specific 
route to a given Sink (generally you will want these to be something that is 
deterministically unique).
   - Create a sink from a given element that corresponds to the route 
identifier mentioned previously. The example below simply uses an `HttpHost` 
instance for simplicity sake, but you could easily construct a more robust 
object to store additional metadata about your sink (e.g. authentication 
information, specific configuration, etc.) that would be passed to the sink 
during construction.
   
   Next when the dynamic sink is invoked, it would check the cache to see if 
the route had previously been seen, if so, it would use the sink for that 
route, otherwise it would create and configure the route (according to the 
`ElasticsearchSinkRouter.createSink()` function), store it within the cache, 
and subsequently invoke the new sink.
   
   I’d appreciate any feedback, suggestions, and more especially from anyone 
familiar with the ES connector implementations. 
   
   ## Brief change log
   
   - *Added `DynamicElasticsearchSink` and related `ElasticsearchSinkRouter` 
interface to support dynamic sink creation and routing at runtime.*
   - *Added a series of tests that leverage some of the existing functionality 
found within the `ElasticsearchSinkBaseTest` test suite*
   
   ## Verifying this change
   
   This change added tests and can be verified as follows:
   
   - *Added test that verifies unique routes from incoming elements "registers" 
a new Elasticsearch sink instance for each unique route.*
   - *Added test that verifies existing sinks are cached based on the route key 
(e.g. two incoming elements that resolve to the same route will be sent to the 
same sink*
   - *Added test to verify that a series of incoming elements are routed to two 
separate sinks based on their routes and each underlying sink contains the 
elements ready to be evicted as pending requests*
   - *Added a series of static helper classes to assist with further testing 
(e.g. `DummyDynamicElasticsearchSink`, `DummyElasticsearchSinkRouter` etc.)*
   - *Locally tested via integration tests (Elasticsearch TestContainers) for 
specific use-cases against multiple clusters*
   - *Tested using production-like data within cloud-based test environments 
running against Elasticsearch clusters for specific business use cases*
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): **No**
     - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: **No**
     - The serializers: **No**
     - The runtime per-record code paths (performance sensitive): **No**
     - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: No
     - The S3 file system connector: **No**
     
   ## Documentation
   
     - Does this pull request introduce a new feature? **Yes**
     - If yes, how is the feature documented? **Javadocs**
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to