pabloem commented on code in PR #24713:
URL: https://github.com/apache/beam/pull/24713#discussion_r1109095030
##########
sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/KafkaSourceConsumerFn.java:
##########
@@ -202,6 +209,63 @@ private static Instant ensureTimestampWithinBounds(Instant
timestamp) {
return timestamp;
}
+ private static final Cache<Map<String, String>, SourceTask> CONNECTOR_CACHE =
+ CacheBuilder.newBuilder()
+ .expireAfterAccess(java.time.Duration.ofSeconds(60))
+ .removalListener(
+ new RemovalListener<Map<String, String>, SourceTask>() {
+ @Override
+ public void onRemoval(
+ RemovalNotification<Map<String, String>, SourceTask>
removalNotification) {
+ LOG.debug(
+ "Task for key [[{}]] is being removed. Cause: {}",
+ removalNotification.getKey(),
+ removalNotification.getCause());
+ removalNotification.getValue().stop();
+ }
+ })
+ .maximumSize(10)
+ .build();
+
+ SourceTask getDebeziumSourceTask(
+ Map<String, String> element, RestrictionTracker<OffsetHolder,
Map<String, Object>> tracker) {
+ final KafkaSourceConsumerFn<T> dofnInstance = this;
+ try {
+ return CONNECTOR_CACHE.get(
+ element,
+ new Callable<SourceTask>() {
+ @Override
+ public SourceTask call() throws Exception {
+ SourceConnector connector = null;
+ SourceTask innerTask;
+ Map<String, String> configuration = new HashMap<>(element);
+ configuration.put(BEAM_INSTANCE_PROPERTY,
dofnInstance.getHashCode());
+ try {
+ connector =
connectorClass.getDeclaredConstructor().newInstance();
+ connector.start(configuration);
+ innerTask =
+ (SourceTask)
connector.taskClass().getDeclaredConstructor().newInstance();
+ } catch (InstantiationException
+ | IllegalAccessException
+ | InvocationTargetException
+ | NoSuchMethodException e) {
+ throw new RuntimeException(
+ "Unable to initialize connector instance for Debezium", e);
+ }
+ Map<String, ?> consumerOffset =
tracker.currentRestriction().offset;
+ LOG.debug("--------- Created new Debezium task with offset: {}",
consumerOffset);
+
+ innerTask.initialize(new
BeamSourceTaskContext(tracker.currentRestriction().offset));
Review Comment:
Oops I'm hoping I didn't stomp on this patch....
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]