capistrant commented on code in PR #18844:
URL: https://github.com/apache/druid/pull/18844#discussion_r2684644064
##########
server/src/main/java/org/apache/druid/segment/metadata/SqlCompactionStateStorage.java:
##########
@@ -94,140 +76,93 @@ public PersistedCompactionStateManager(
this.connector = connector;
}
- @LifecycleStart
- public void start()
- {
- }
-
- @LifecycleStop
- public void stop()
- {
- }
-
- @VisibleForTesting
- PersistedCompactionStateManager()
- {
- this.dbTables = null;
- this.jsonMapper = null;
- this.deterministicMapper = null;
- this.connector = null;
- }
-
@Override
- public void persistCompactionState(
+ public void upsertCompactionState(
final String dataSource,
- final Map<String, CompactionState> fingerprintToStateMap,
+ final String fingerprint,
+ final CompactionState compactionState,
final DateTime updateTime
)
{
- if (fingerprintToStateMap.isEmpty()) {
+ if (compactionState == null || fingerprint == null ||
fingerprint.isEmpty()) {
return;
}
- final Lock lock = datasourceLocks.get(dataSource);
- lock.lock();
try {
connector.retryWithHandle(handle -> {
- // Fetch already existing compaction state fingerprints
- final Set<String> existingFingerprints = getExistingFingerprints(
- handle,
- fingerprintToStateMap.keySet()
- );
+ // Check if the fingerprint already exists
+ final boolean fingerprintExists = isExistingFingerprint(handle,
fingerprint);
+ final String now = updateTime.toString();
- if (!existingFingerprints.isEmpty()) {
+ if (fingerprintExists) {
+ // Fingerprint exists - update the used flag
log.info(
- "Found already existing compaction state in the DB for
dataSource[%s]. Fingerprints: %s.",
- dataSource,
- existingFingerprints
+ "Found already existing compaction state in DB for
fingerprint[%s] in dataSource[%s].",
+ fingerprint,
+ dataSource
);
- String setFingerprintsUsedSql = StringUtils.format(
+ String updateSql = StringUtils.format(
"UPDATE %s SET used = :used, used_status_last_updated =
:used_status_last_updated "
+ "WHERE fingerprint = :fingerprint",
dbTables.getCompactionStatesTable()
);
- PreparedBatch markUsedBatch =
handle.prepareBatch(setFingerprintsUsedSql);
- for (String fingerprint : existingFingerprints) {
- final String now = updateTime.toString();
- markUsedBatch.add()
- .bind("used", true)
- .bind("used_status_last_updated", now)
- .bind("fingerprint", fingerprint);
- }
- markUsedBatch.execute();
- }
-
- Map<String, CompactionState> statesToPersist = new HashMap<>();
+ handle.createStatement(updateSql)
+ .bind("used", true)
+ .bind("used_status_last_updated", now)
+ .bind("fingerprint", fingerprint)
+ .execute();
- for (Map.Entry<String, CompactionState> entry :
fingerprintToStateMap.entrySet()) {
- if (!existingFingerprints.contains(entry.getKey())) {
- statesToPersist.put(entry.getKey(), entry.getValue());
- }
- }
+ log.info("Updated existing compaction state for datasource[%s].",
dataSource);
+ } else {
- if (statesToPersist.isEmpty()) {
- log.info("No compaction state to persist for dataSource [%s].",
dataSource);
- return null;
- }
+ // Fingerprint doesn't exist - insert new state
+ log.info("Inserting new compaction state for fingerprint[%s] in
dataSource[%s].", fingerprint, dataSource);
- final List<List<String>> partitionedFingerprints = Lists.partition(
- new ArrayList<>(statesToPersist.keySet()),
- DB_ACTION_PARTITION_SIZE
- );
+ String insertSql = StringUtils.format(
+ "INSERT INTO %s (created_date, dataSource, fingerprint, payload,
used, used_status_last_updated) "
+ + "VALUES (:created_date, :dataSource, :fingerprint, :payload,
:used, :used_status_last_updated)",
+ dbTables.getCompactionStatesTable()
+ );
- String insertSql = StringUtils.format(
- "INSERT INTO %s (created_date, datasource, fingerprint, payload,
used, used_status_last_updated) "
- + "VALUES (:created_date, :datasource, :fingerprint, :payload,
:used, :used_status_last_updated)",
- dbTables.getCompactionStatesTable()
- );
+ try {
+ handle.createStatement(insertSql)
+ .bind("created_date", now)
+ .bind("dataSource", dataSource)
+ .bind("fingerprint", fingerprint)
+ .bind("payload",
jsonMapper.writeValueAsBytes(compactionState))
+ .bind("used", true)
+ .bind("used_status_last_updated", now)
+ .execute();
- // Insert compaction states
- PreparedBatch stateInsertBatch = handle.prepareBatch(insertSql);
- for (List<String> partition : partitionedFingerprints) {
- for (String fingerprint : partition) {
- final String now = updateTime.toString();
- try {
- stateInsertBatch.add()
- .bind("created_date", now)
- .bind("datasource", dataSource)
- .bind("fingerprint", fingerprint)
- .bind("payload",
jsonMapper.writeValueAsBytes(fingerprintToStateMap.get(fingerprint)))
- .bind("used", true)
- .bind("used_status_last_updated", now);
- }
- catch (JsonProcessingException e) {
- throw InternalServerError.exception(
- e,
- "Failed to serialize compaction state for fingerprint[%s]",
- fingerprint
- );
- }
- }
- final int[] affectedRows = stateInsertBatch.execute();
- final List<String> failedInserts = new ArrayList<>();
- for (int i = 0; i < partition.size(); ++i) {
- if (affectedRows[i] != 1) {
- failedInserts.add(partition.get(i));
- }
- }
- if (failedInserts.isEmpty()) {
log.info(
- "Published compaction states %s to DB for datasource[%s].",
- partition,
+ "Published compaction state for fingerprint[%s] to DB for
datasource[%s].",
+ fingerprint,
dataSource
);
Review Comment:
ya great call out
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]