rionmonster opened a new pull request #17061:
URL: https://github.com/apache/flink/pull/17061
## What is the purpose of the change
Currently, the existing `ElasticsearchSink` implementation supports dynamic,
index-based routing at runtime however any additional routing to separate ES
clusters would require the sinks to be known when the job was started up, which
may not always be the available.
This pull request implements a `DynamicElasticsearchSink` that supports an
`ElasticsearchSinkRouter` interface which handles resolving a unique route and
sink from an incoming element, which in turn **allows elements to not only be
routed to dynamic indices but also dynamic Elasticsearch sinks at runtime**.
The sink itself functions as a wrapper with an in-memory cache that contains
the route (i.e. key) and its corresponding sink. When a previously unseen route
is encountered, a new sink will be created and cached, otherwise the original
sink for that route will be used.
### Example Usage
An example usage might look something like this:
```
env
.fromElements(
// Define a series of Tuples where the first element represents
the ES
// host being targeted and the second is the message payload
Tuple2.of(HttpHost.create(".../sink-a"), "message-a-1"),
Tuple2.of(HttpHost.create(".../sink-b"), "message-b-1"),
Tuple2.of(HttpHost.create(".../sink-a"), "message-a-2")
)
.addSink(
new DynamicElasticsearchSink<>(
new ElasticsearchSinkRouter<
// Element-type
Tuple2<HttpHost, String>,
// Route-type (used to seed the hashmap / cache sinks)
String,
// Sink implementation for this route
ElasticsearchSink<Tuple2<HttpHost, String>>>() {
@Override
public String getRoute(Tuple2<HttpHost, String> element) {
// Construct a deterministic unique caching key to
associate
// a given "route" to a corresponding sink
return element.f0.toHostString();
}
@Override
public ElasticsearchSink<Tuple2<HttpHost, String>>
createSink(
String cacheKey, Tuple2<HttpHost, String> element) {
// Construct a new sink based on the configuration
information for
// this element
ElasticsearchSink.Builder<Tuple2<HttpHost, String>>
builder =
ElasticsearchSink.Builder(
List.of(element.f0),
(ElasticsearchSinkFunction<Tuple2<HttpHost,
String>>)
(el, ctx, indexer) -> {
// Construct index request.
indexer.add(...);
});
builder.setRestClientFactory(restClientBuilder -> {
// Apply any authentication here if that information
was available
// within your routing / element
});
return builder.build();
}
}
)
);
```
This example takes a series of `Tuple2<HttpHost, String>` elements (where
the HttpHost represents the host of a given Elasticsearch cluster and the
String is the data), these are passed to an `ElasticsearchSinkRouter` instance
that can do the following given an instance:
- Resolve the route identifier `RouteT` that is used to associate a specific
route to a given Sink (generally you will want these to be something that is
deterministically unique).
- Create a sink from a given element that corresponds to the route
identifier mentioned previously. The example below simply uses an `HttpHost`
instance for simplicity sake, but you could easily construct a more robust
object to store additional metadata about your sink (e.g. authentication
information, specific configuration, etc.) that would be passed to the sink
during construction.
Next when the dynamic sink is invoked, it would check the cache to see if
the route had previously been seen, if so, it would use the sink for that
route, otherwise it would create and configure the route (according to the
`ElasticsearchSinkRouter.createSink()` function), store it within the cache,
and subsequently invoke the new sink.
I’d appreciate any feedback, suggestions, and more especially from anyone
familiar with the ES connector implementations.
## Brief change log
- *Added `DynamicElasticsearchSink` and related `ElasticsearchSinkRouter`
interface to support dynamic sink creation and routing at runtime.*
- *Added a series of tests that leverage some of the existing functionality
found within the `ElasticsearchSinkBaseTest` test suite*
## Verifying this change
This change added tests and can be verified as follows:
- *Added test that verifies unique routes from incoming elements "registers"
a new Elasticsearch sink instance for each unique route.*
- *Added test that verifies existing sinks are cached based on the route key
(e.g. two incoming elements that resolve to the same route will be sent to the
same sink*
- *Added test to verify that a series of incoming elements are routed to two
separate sinks based on their routes and each underlying sink contains the
elements ready to be evicted as pending requests*
- *Added a series of static helper classes to assist with further testing
(e.g. `DummyDynamicElasticsearchSink`, `DummyElasticsearchSinkRouter` etc.)*
- *Locally tested via integration tests (Elasticsearch TestContainers) for
specific use-cases against multiple clusters*
- *Tested using production-like data within cloud-based test environments
running against Elasticsearch clusters for specific business use cases*
## Does this pull request potentially affect one of the following parts:
- Dependencies (does it add or upgrade a dependency): **No**
- The public API, i.e., is any changed class annotated with
`@Public(Evolving)`: **No**
- The serializers: **No**
- The runtime per-record code paths (performance sensitive): **No**
- Anything that affects deployment or recovery: JobManager (and its
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: No
- The S3 file system connector: **No**
## Documentation
- Does this pull request introduce a new feature? **Yes**
- If yes, how is the feature documented? **Javadocs**
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]