lukecwik commented on code in PR #24713:
URL: https://github.com/apache/beam/pull/24713#discussion_r1094806645


##########
sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/KafkaSourceConsumerFn.java:
##########
@@ -202,6 +209,63 @@ private static Instant ensureTimestampWithinBounds(Instant 
timestamp) {
     return timestamp;
   }
 
+  private static final Cache<Map<String, String>, SourceTask> CONNECTOR_CACHE =
+      CacheBuilder.newBuilder()
+          .expireAfterAccess(java.time.Duration.ofSeconds(60))
+          .removalListener(
+              new RemovalListener<Map<String, String>, SourceTask>() {
+                @Override
+                public void onRemoval(
+                    RemovalNotification<Map<String, String>, SourceTask> 
removalNotification) {
+                  LOG.debug(
+                      "Task for key [[{}]] is being removed. Cause: {}",
+                      removalNotification.getKey(),
+                      removalNotification.getCause());
+                  removalNotification.getValue().stop();
+                }
+              })
+          .maximumSize(10)
+          .build();
+
+  SourceTask getDebeziumSourceTask(
+      Map<String, String> element, RestrictionTracker<OffsetHolder, 
Map<String, Object>> tracker) {
+    final KafkaSourceConsumerFn<T> dofnInstance = this;
+    try {
+      return CONNECTOR_CACHE.get(
+          element,
+          new Callable<SourceTask>() {
+            @Override
+            public SourceTask call() throws Exception {
+              SourceConnector connector = null;
+              SourceTask innerTask;
+              Map<String, String> configuration = new HashMap<>(element);
+              configuration.put(BEAM_INSTANCE_PROPERTY, 
dofnInstance.getHashCode());
+              try {
+                connector = 
connectorClass.getDeclaredConstructor().newInstance();
+                connector.start(configuration);
+                innerTask =
+                    (SourceTask) 
connector.taskClass().getDeclaredConstructor().newInstance();
+              } catch (InstantiationException
+                  | IllegalAccessException
+                  | InvocationTargetException
+                  | NoSuchMethodException e) {
+                throw new RuntimeException(
+                    "Unable to initialize connector instance for Debezium", e);
+              }
+              Map<String, ?> consumerOffset = 
tracker.currentRestriction().offset;
+              LOG.debug("--------- Created new Debezium task with offset: {}", 
consumerOffset);
+
+              innerTask.initialize(new 
BeamSourceTaskContext(tracker.currentRestriction().offset));

Review Comment:
   ```suggestion
                 innerTask.initialize(new 
BeamSourceTaskContext(consumerOffset));
   ```



##########
sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/KafkaSourceConsumerFn.java:
##########
@@ -202,6 +209,63 @@ private static Instant ensureTimestampWithinBounds(Instant 
timestamp) {
     return timestamp;
   }
 
+  private static final Cache<Map<String, String>, SourceTask> CONNECTOR_CACHE =
+      CacheBuilder.newBuilder()
+          .expireAfterAccess(java.time.Duration.ofSeconds(60))
+          .removalListener(
+              new RemovalListener<Map<String, String>, SourceTask>() {
+                @Override
+                public void onRemoval(
+                    RemovalNotification<Map<String, String>, SourceTask> 
removalNotification) {
+                  LOG.debug(
+                      "Task for key [[{}]] is being removed. Cause: {}",
+                      removalNotification.getKey(),
+                      removalNotification.getCause());
+                  removalNotification.getValue().stop();
+                }
+              })
+          .maximumSize(10)
+          .build();
+
+  SourceTask getDebeziumSourceTask(
+      Map<String, String> element, RestrictionTracker<OffsetHolder, 
Map<String, Object>> tracker) {
+    final KafkaSourceConsumerFn<T> dofnInstance = this;
+    try {

Review Comment:
   If we have a value in the cache, do we need to update the consumer offset on 
what is being returned (we currently only do it on the first creation)?



##########
sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/KafkaSourceConsumerFn.java:
##########
@@ -202,6 +209,63 @@ private static Instant ensureTimestampWithinBounds(Instant 
timestamp) {
     return timestamp;
   }
 
+  private static final Cache<Map<String, String>, SourceTask> CONNECTOR_CACHE =
+      CacheBuilder.newBuilder()
+          .expireAfterAccess(java.time.Duration.ofSeconds(60))
+          .removalListener(
+              new RemovalListener<Map<String, String>, SourceTask>() {
+                @Override
+                public void onRemoval(
+                    RemovalNotification<Map<String, String>, SourceTask> 
removalNotification) {
+                  LOG.debug(
+                      "Task for key [[{}]] is being removed. Cause: {}",
+                      removalNotification.getKey(),
+                      removalNotification.getCause());
+                  removalNotification.getValue().stop();
+                }
+              })
+          .maximumSize(10)
+          .build();
+
+  SourceTask getDebeziumSourceTask(
+      Map<String, String> element, RestrictionTracker<OffsetHolder, 
Map<String, Object>> tracker) {

Review Comment:
   Can the source task be used in parallel if there are multiple elements with 
the same configuration but different restrictions (since multiple `element + 
restriction` pairs can be executed in parallel while the map is keyed by the 
`element`)?
   
   If no, then you want to build an object pool and not an object cache



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to