lukecwik commented on code in PR #24713:
URL: https://github.com/apache/beam/pull/24713#discussion_r1094806645
##########
sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/KafkaSourceConsumerFn.java:
##########
@@ -202,6 +209,63 @@ private static Instant ensureTimestampWithinBounds(Instant
timestamp) {
return timestamp;
}
+ private static final Cache<Map<String, String>, SourceTask> CONNECTOR_CACHE =
+ CacheBuilder.newBuilder()
+ .expireAfterAccess(java.time.Duration.ofSeconds(60))
+ .removalListener(
+ new RemovalListener<Map<String, String>, SourceTask>() {
+ @Override
+ public void onRemoval(
+ RemovalNotification<Map<String, String>, SourceTask>
removalNotification) {
+ LOG.debug(
+ "Task for key [[{}]] is being removed. Cause: {}",
+ removalNotification.getKey(),
+ removalNotification.getCause());
+ removalNotification.getValue().stop();
+ }
+ })
+ .maximumSize(10)
+ .build();
+
+ SourceTask getDebeziumSourceTask(
+ Map<String, String> element, RestrictionTracker<OffsetHolder,
Map<String, Object>> tracker) {
+ final KafkaSourceConsumerFn<T> dofnInstance = this;
+ try {
+ return CONNECTOR_CACHE.get(
+ element,
+ new Callable<SourceTask>() {
+ @Override
+ public SourceTask call() throws Exception {
+ SourceConnector connector = null;
+ SourceTask innerTask;
+ Map<String, String> configuration = new HashMap<>(element);
+ configuration.put(BEAM_INSTANCE_PROPERTY,
dofnInstance.getHashCode());
+ try {
+ connector =
connectorClass.getDeclaredConstructor().newInstance();
+ connector.start(configuration);
+ innerTask =
+ (SourceTask)
connector.taskClass().getDeclaredConstructor().newInstance();
+ } catch (InstantiationException
+ | IllegalAccessException
+ | InvocationTargetException
+ | NoSuchMethodException e) {
+ throw new RuntimeException(
+ "Unable to initialize connector instance for Debezium", e);
+ }
+ Map<String, ?> consumerOffset =
tracker.currentRestriction().offset;
+ LOG.debug("--------- Created new Debezium task with offset: {}",
consumerOffset);
+
+ innerTask.initialize(new
BeamSourceTaskContext(tracker.currentRestriction().offset));
Review Comment:
```suggestion
innerTask.initialize(new
BeamSourceTaskContext(consumerOffset));
```
##########
sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/KafkaSourceConsumerFn.java:
##########
@@ -202,6 +209,63 @@ private static Instant ensureTimestampWithinBounds(Instant
timestamp) {
return timestamp;
}
+ private static final Cache<Map<String, String>, SourceTask> CONNECTOR_CACHE =
+ CacheBuilder.newBuilder()
+ .expireAfterAccess(java.time.Duration.ofSeconds(60))
+ .removalListener(
+ new RemovalListener<Map<String, String>, SourceTask>() {
+ @Override
+ public void onRemoval(
+ RemovalNotification<Map<String, String>, SourceTask>
removalNotification) {
+ LOG.debug(
+ "Task for key [[{}]] is being removed. Cause: {}",
+ removalNotification.getKey(),
+ removalNotification.getCause());
+ removalNotification.getValue().stop();
+ }
+ })
+ .maximumSize(10)
+ .build();
+
+ SourceTask getDebeziumSourceTask(
+ Map<String, String> element, RestrictionTracker<OffsetHolder,
Map<String, Object>> tracker) {
+ final KafkaSourceConsumerFn<T> dofnInstance = this;
+ try {
Review Comment:
If we have a value in the cache, do we need to update the consumer offset on
what is being returned (we currently only do it on the first creation)?
##########
sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/KafkaSourceConsumerFn.java:
##########
@@ -202,6 +209,63 @@ private static Instant ensureTimestampWithinBounds(Instant
timestamp) {
return timestamp;
}
+ private static final Cache<Map<String, String>, SourceTask> CONNECTOR_CACHE =
+ CacheBuilder.newBuilder()
+ .expireAfterAccess(java.time.Duration.ofSeconds(60))
+ .removalListener(
+ new RemovalListener<Map<String, String>, SourceTask>() {
+ @Override
+ public void onRemoval(
+ RemovalNotification<Map<String, String>, SourceTask>
removalNotification) {
+ LOG.debug(
+ "Task for key [[{}]] is being removed. Cause: {}",
+ removalNotification.getKey(),
+ removalNotification.getCause());
+ removalNotification.getValue().stop();
+ }
+ })
+ .maximumSize(10)
+ .build();
+
+ SourceTask getDebeziumSourceTask(
+ Map<String, String> element, RestrictionTracker<OffsetHolder,
Map<String, Object>> tracker) {
Review Comment:
Can the source task be used in parallel if there are multiple elements with
the same configuration but different restrictions (since multiple `element +
restriction` pairs can be executed in parallel while the map is keyed by the
`element`)?
If no, then you want to build an object pool and not an object cache
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]