swapneshgandhi commented on a change in pull request #8272: Support incremental
load in Druid lookups
URL: https://github.com/apache/incubator-druid/pull/8272#discussion_r327358928
##########
File path:
extensions-core/lookups-cached-global/src/main/java/org/apache/druid/server/lookup/namespace/JdbcCacheGenerator.java
##########
@@ -101,23 +121,50 @@
}
);
final String newVersion;
- if (lastDBUpdate != null) {
- newVersion = lastDBUpdate.toString();
- } else {
- newVersion = StringUtils.format("%d", dbQueryStart);
- }
- final CacheScheduler.VersionedCache versionedCache =
scheduler.createVersionedCache(entryId, newVersion);
+ CacheScheduler.VersionedCache versionedCache = null;
try {
- final Map<String, String> cache = versionedCache.getCache();
- for (Pair<String, String> pair : pairs) {
- cache.put(pair.lhs, pair.rhs);
+ if (doIncrementalLoad) {
+ newVersion = StringUtils.format("%d", lastDBUpdate);
+ Map<String, String> newCachedEntries = new HashMap<>();
+ for (Pair<String, String> pair : pairs) {
+ newCachedEntries.put(pair.lhs, pair.rhs);
+ }
+ versionedCache = entryId.createFromExistingCache(entryId, newVersion,
newCachedEntries);
+ LOG.info("Finished loading %d new incremental values in last %s for %s
",
+ newCachedEntries.size(),
+ formatter.print(new Period(lastCheck, lastDBUpdate)),
+ entryId
+ );
+ return versionedCache;
+ } else {
+ LOG.debug("Not doing incremental load because either " +
+ "namespace.getTsColumn() is not set or this is the first
load." +
+ " lastDBUpdate: %s, namespace.getTsColumn(): %s, lastVersion:
%s",
+ lastDBUpdate,
+ namespace.getTsColumn(),
+ lastVersion);
+ if (lastDBUpdate != null) {
+ // Setting newVersion to lastDBUpdate will ensure that during next
load
+ // we read the keys that were modified after lastDBUpdate.
+ // See how lastCheck is being set in the beginning of
generateCache().
+ newVersion = StringUtils.format("%d", lastDBUpdate);
+ } else {
+ newVersion = StringUtils.format("%d", dbQueryStart);
+ }
+ versionedCache = scheduler.createVersionedCache(entryId, newVersion,
null);
+ final Map<String, String> cache = versionedCache.getCache();
+ for (Pair<String, String> pair : pairs) {
+ cache.put(pair.lhs, pair.rhs);
+ }
+ LOG.info("Finished loading %d values for %s", cache.size(), entryId);
+ return versionedCache;
}
- LOG.info("Finished loading %d values for %s", cache.size(), entryId);
- return versionedCache;
}
catch (Throwable t) {
try {
- versionedCache.close();
+ if (versionedCache != null) {
+ versionedCache.close();
Review comment:
done
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]