kfaraz commented on code in PR #18844:
URL: https://github.com/apache/druid/pull/18844#discussion_r2680837382


##########
server/src/main/java/org/apache/druid/segment/metadata/SqlCompactionStateStorage.java:
##########
@@ -398,4 +313,40 @@ private static void bindValuesToInClause(
       query.bind(parameterPrefix + i, values.get(i));
     }
   }
+
+  /**
+   * Checks if an exception is a unique constraint violation.
+   * This is expected when multiple threads try to insert the same fingerprint 
concurrently.
+   * Since operations are idempotent, these violations can be safely ignored.
+   */
+  private boolean isUniqueConstraintViolation(Exception e)
+  {
+    // Look for SQLException in the cause chain
+    Throwable cause = e;
+    while (cause != null) {
+      if (cause instanceof SQLException) {
+        SQLException sqlException = (SQLException) cause;
+        String sqlState = sqlException.getSQLState();
+
+        // SQL standard unique constraint violation codes
+        // 23505 = unique_violation (PostgreSQL, Derby)
+        // 23000 = integrity_constraint_violation (MySQL and others)
+        if ("23505".equals(sqlState) || "23000".equals(sqlState)) {

Review Comment:
   It would be cleaner to add the method `isUniqueConstraintViolation` method 
to `SQLMetadataConnector` so that each concrete DB may provide its own 
implementation, rather than handle all possible cases here.



##########
server/src/main/java/org/apache/druid/segment/metadata/SqlCompactionStateStorage.java:
##########
@@ -94,140 +76,93 @@ public PersistedCompactionStateManager(
     this.connector = connector;
   }
 
-  @LifecycleStart
-  public void start()
-  {
-  }
-
-  @LifecycleStop
-  public void stop()
-  {
-  }
-
-  @VisibleForTesting
-  PersistedCompactionStateManager()
-  {
-    this.dbTables = null;
-    this.jsonMapper = null;
-    this.deterministicMapper = null;
-    this.connector = null;
-  }
-
   @Override
-  public void persistCompactionState(
+  public void upsertCompactionState(
       final String dataSource,
-      final Map<String, CompactionState> fingerprintToStateMap,
+      final String fingerprint,
+      final CompactionState compactionState,
       final DateTime updateTime
   )
   {
-    if (fingerprintToStateMap.isEmpty()) {
+    if (compactionState == null || fingerprint == null || 
fingerprint.isEmpty()) {
       return;
     }
 
-    final Lock lock = datasourceLocks.get(dataSource);
-    lock.lock();
     try {
       connector.retryWithHandle(handle -> {
-        // Fetch already existing compaction state fingerprints
-        final Set<String> existingFingerprints = getExistingFingerprints(
-            handle,
-            fingerprintToStateMap.keySet()
-        );
+        // Check if the fingerprint already exists
+        final boolean fingerprintExists = isExistingFingerprint(handle, 
fingerprint);
+        final String now = updateTime.toString();
 
-        if (!existingFingerprints.isEmpty()) {
+        if (fingerprintExists) {
+          // Fingerprint exists - update the used flag
           log.info(
-              "Found already existing compaction state in the DB for 
dataSource[%s]. Fingerprints: %s.",
-              dataSource,
-              existingFingerprints
+              "Found already existing compaction state in DB for 
fingerprint[%s] in dataSource[%s].",
+              fingerprint,
+              dataSource
           );
-          String setFingerprintsUsedSql = StringUtils.format(
+          String updateSql = StringUtils.format(

Review Comment:
   I think that the `UPDATE` should be needed only if the fingerprint is 
currently marked as unused. If already marked as used, we shouldn't need to 
update anything. What do you think?



##########
server/src/main/java/org/apache/druid/segment/metadata/SqlCompactionStateStorage.java:
##########
@@ -94,140 +76,93 @@ public PersistedCompactionStateManager(
     this.connector = connector;
   }
 
-  @LifecycleStart
-  public void start()
-  {
-  }
-
-  @LifecycleStop
-  public void stop()
-  {
-  }
-
-  @VisibleForTesting
-  PersistedCompactionStateManager()
-  {
-    this.dbTables = null;
-    this.jsonMapper = null;
-    this.deterministicMapper = null;
-    this.connector = null;
-  }
-
   @Override
-  public void persistCompactionState(
+  public void upsertCompactionState(
       final String dataSource,
-      final Map<String, CompactionState> fingerprintToStateMap,
+      final String fingerprint,
+      final CompactionState compactionState,
       final DateTime updateTime
   )
   {
-    if (fingerprintToStateMap.isEmpty()) {
+    if (compactionState == null || fingerprint == null || 
fingerprint.isEmpty()) {
       return;
     }
 
-    final Lock lock = datasourceLocks.get(dataSource);
-    lock.lock();
     try {
       connector.retryWithHandle(handle -> {
-        // Fetch already existing compaction state fingerprints
-        final Set<String> existingFingerprints = getExistingFingerprints(
-            handle,
-            fingerprintToStateMap.keySet()
-        );
+        // Check if the fingerprint already exists
+        final boolean fingerprintExists = isExistingFingerprint(handle, 
fingerprint);
+        final String now = updateTime.toString();
 
-        if (!existingFingerprints.isEmpty()) {
+        if (fingerprintExists) {
+          // Fingerprint exists - update the used flag
           log.info(
-              "Found already existing compaction state in the DB for 
dataSource[%s]. Fingerprints: %s.",
-              dataSource,
-              existingFingerprints
+              "Found already existing compaction state in DB for 
fingerprint[%s] in dataSource[%s].",
+              fingerprint,
+              dataSource
           );
-          String setFingerprintsUsedSql = StringUtils.format(
+          String updateSql = StringUtils.format(
               "UPDATE %s SET used = :used, used_status_last_updated = 
:used_status_last_updated "
               + "WHERE fingerprint = :fingerprint",
               dbTables.getCompactionStatesTable()
           );
-          PreparedBatch markUsedBatch = 
handle.prepareBatch(setFingerprintsUsedSql);
-          for (String fingerprint : existingFingerprints) {
-            final String now = updateTime.toString();
-            markUsedBatch.add()
-                         .bind("used", true)
-                         .bind("used_status_last_updated", now)
-                         .bind("fingerprint", fingerprint);
-          }
-          markUsedBatch.execute();
-        }
-
-        Map<String, CompactionState> statesToPersist = new HashMap<>();
+          handle.createStatement(updateSql)
+                .bind("used", true)
+                .bind("used_status_last_updated", now)
+                .bind("fingerprint", fingerprint)
+                .execute();
 
-        for (Map.Entry<String, CompactionState> entry : 
fingerprintToStateMap.entrySet()) {
-          if (!existingFingerprints.contains(entry.getKey())) {
-            statesToPersist.put(entry.getKey(), entry.getValue());
-          }
-        }
+          log.info("Updated existing compaction state for datasource[%s].", 
dataSource);
+        } else {
 
-        if (statesToPersist.isEmpty()) {
-          log.info("No compaction state to persist for dataSource [%s].", 
dataSource);
-          return null;
-        }
+          // Fingerprint doesn't exist - insert new state
+          log.info("Inserting new compaction state for fingerprint[%s] in 
dataSource[%s].", fingerprint, dataSource);
 
-        final List<List<String>> partitionedFingerprints = Lists.partition(
-            new ArrayList<>(statesToPersist.keySet()),
-            DB_ACTION_PARTITION_SIZE
-        );
+          String insertSql = StringUtils.format(
+              "INSERT INTO %s (created_date, dataSource, fingerprint, payload, 
used, used_status_last_updated) "
+              + "VALUES (:created_date, :dataSource, :fingerprint, :payload, 
:used, :used_status_last_updated)",
+              dbTables.getCompactionStatesTable()
+          );
 
-        String insertSql = StringUtils.format(
-            "INSERT INTO %s (created_date, datasource, fingerprint, payload, 
used, used_status_last_updated) "
-            + "VALUES (:created_date, :datasource, :fingerprint, :payload, 
:used, :used_status_last_updated)",
-            dbTables.getCompactionStatesTable()
-        );
+          try {
+            handle.createStatement(insertSql)
+                  .bind("created_date", now)
+                  .bind("dataSource", dataSource)
+                  .bind("fingerprint", fingerprint)
+                  .bind("payload", 
jsonMapper.writeValueAsBytes(compactionState))
+                  .bind("used", true)
+                  .bind("used_status_last_updated", now)
+                  .execute();
 
-        // Insert compaction states
-        PreparedBatch stateInsertBatch = handle.prepareBatch(insertSql);
-        for (List<String> partition : partitionedFingerprints) {
-          for (String fingerprint : partition) {
-            final String now = updateTime.toString();
-            try {
-              stateInsertBatch.add()
-                              .bind("created_date", now)
-                              .bind("datasource", dataSource)
-                              .bind("fingerprint", fingerprint)
-                              .bind("payload", 
jsonMapper.writeValueAsBytes(fingerprintToStateMap.get(fingerprint)))
-                              .bind("used", true)
-                              .bind("used_status_last_updated", now);
-            }
-            catch (JsonProcessingException e) {
-              throw InternalServerError.exception(
-                  e,
-                  "Failed to serialize compaction state for fingerprint[%s]",
-                  fingerprint
-              );
-            }
-          }
-          final int[] affectedRows = stateInsertBatch.execute();
-          final List<String> failedInserts = new ArrayList<>();
-          for (int i = 0; i < partition.size(); ++i) {
-            if (affectedRows[i] != 1) {
-              failedInserts.add(partition.get(i));
-            }
-          }
-          if (failedInserts.isEmpty()) {
             log.info(
-                "Published compaction states %s to DB for datasource[%s].",
-                partition,
+                "Published compaction state for fingerprint[%s] to DB for 
datasource[%s].",
+                fingerprint,
                 dataSource
             );
-          } else {
-            throw new ISE(
-                "Failed to publish compaction states[%s] to DB for 
datasource[%s]",
-                failedInserts,
-                dataSource
+          }
+          catch (JsonProcessingException e) {
+            throw InternalServerError.exception(
+                e,
+                "Failed to serialize compaction state for fingerprint[%s]",
+                fingerprint
             );
           }
         }
         return null;
       });
     }
-    finally {
-      lock.unlock();
+    catch (Exception e) {
+      if (isUniqueConstraintViolation(e)) {
+        log.info(
+            "Fingerprints already exist for datasource[%s] (likely concurrent 
insert). "
+            + "Treating as success since operation is idempotent.",
+            dataSource
+        );
+        // Swallow exception - another thread already persisted the same data
+        return;
+      }
+      // For other exceptions, let them propagate
+      throw e;

Review Comment:
   Nit: Probably cleaner to use if-else.
   ```java
   if (unique constraint violated) {
      // log the details
   } else {
      throw e;
   }



##########
server/src/main/java/org/apache/druid/segment/metadata/SqlCompactionStateStorage.java:
##########
@@ -94,140 +76,93 @@ public PersistedCompactionStateManager(
     this.connector = connector;
   }
 
-  @LifecycleStart
-  public void start()
-  {
-  }
-
-  @LifecycleStop
-  public void stop()
-  {
-  }
-
-  @VisibleForTesting
-  PersistedCompactionStateManager()
-  {
-    this.dbTables = null;
-    this.jsonMapper = null;
-    this.deterministicMapper = null;
-    this.connector = null;
-  }
-
   @Override
-  public void persistCompactionState(
+  public void upsertCompactionState(
       final String dataSource,
-      final Map<String, CompactionState> fingerprintToStateMap,
+      final String fingerprint,
+      final CompactionState compactionState,
       final DateTime updateTime
   )
   {
-    if (fingerprintToStateMap.isEmpty()) {
+    if (compactionState == null || fingerprint == null || 
fingerprint.isEmpty()) {
       return;

Review Comment:
   Should we throw an `InvalidInput` exception here instead?



##########
server/src/main/java/org/apache/druid/segment/metadata/SqlCompactionStateStorage.java:
##########
@@ -94,140 +76,93 @@ public PersistedCompactionStateManager(
     this.connector = connector;
   }
 
-  @LifecycleStart
-  public void start()
-  {
-  }
-
-  @LifecycleStop
-  public void stop()
-  {
-  }
-
-  @VisibleForTesting
-  PersistedCompactionStateManager()
-  {
-    this.dbTables = null;
-    this.jsonMapper = null;
-    this.deterministicMapper = null;
-    this.connector = null;
-  }
-
   @Override
-  public void persistCompactionState(
+  public void upsertCompactionState(
       final String dataSource,
-      final Map<String, CompactionState> fingerprintToStateMap,
+      final String fingerprint,
+      final CompactionState compactionState,
       final DateTime updateTime
   )
   {
-    if (fingerprintToStateMap.isEmpty()) {
+    if (compactionState == null || fingerprint == null || 
fingerprint.isEmpty()) {
       return;
     }
 
-    final Lock lock = datasourceLocks.get(dataSource);
-    lock.lock();
     try {
       connector.retryWithHandle(handle -> {
-        // Fetch already existing compaction state fingerprints
-        final Set<String> existingFingerprints = getExistingFingerprints(
-            handle,
-            fingerprintToStateMap.keySet()
-        );
+        // Check if the fingerprint already exists
+        final boolean fingerprintExists = isExistingFingerprint(handle, 
fingerprint);
+        final String now = updateTime.toString();
 
-        if (!existingFingerprints.isEmpty()) {
+        if (fingerprintExists) {
+          // Fingerprint exists - update the used flag
           log.info(
-              "Found already existing compaction state in the DB for 
dataSource[%s]. Fingerprints: %s.",
-              dataSource,
-              existingFingerprints
+              "Found already existing compaction state in DB for 
fingerprint[%s] in dataSource[%s].",
+              fingerprint,
+              dataSource
           );
-          String setFingerprintsUsedSql = StringUtils.format(
+          String updateSql = StringUtils.format(
               "UPDATE %s SET used = :used, used_status_last_updated = 
:used_status_last_updated "
               + "WHERE fingerprint = :fingerprint",
               dbTables.getCompactionStatesTable()
           );
-          PreparedBatch markUsedBatch = 
handle.prepareBatch(setFingerprintsUsedSql);
-          for (String fingerprint : existingFingerprints) {
-            final String now = updateTime.toString();
-            markUsedBatch.add()
-                         .bind("used", true)
-                         .bind("used_status_last_updated", now)
-                         .bind("fingerprint", fingerprint);
-          }
-          markUsedBatch.execute();
-        }
-
-        Map<String, CompactionState> statesToPersist = new HashMap<>();
+          handle.createStatement(updateSql)
+                .bind("used", true)
+                .bind("used_status_last_updated", now)
+                .bind("fingerprint", fingerprint)
+                .execute();
 
-        for (Map.Entry<String, CompactionState> entry : 
fingerprintToStateMap.entrySet()) {
-          if (!existingFingerprints.contains(entry.getKey())) {
-            statesToPersist.put(entry.getKey(), entry.getValue());
-          }
-        }
+          log.info("Updated existing compaction state for datasource[%s].", 
dataSource);
+        } else {
 
-        if (statesToPersist.isEmpty()) {
-          log.info("No compaction state to persist for dataSource [%s].", 
dataSource);
-          return null;
-        }
+          // Fingerprint doesn't exist - insert new state
+          log.info("Inserting new compaction state for fingerprint[%s] in 
dataSource[%s].", fingerprint, dataSource);
 
-        final List<List<String>> partitionedFingerprints = Lists.partition(
-            new ArrayList<>(statesToPersist.keySet()),
-            DB_ACTION_PARTITION_SIZE
-        );
+          String insertSql = StringUtils.format(
+              "INSERT INTO %s (created_date, dataSource, fingerprint, payload, 
used, used_status_last_updated) "
+              + "VALUES (:created_date, :dataSource, :fingerprint, :payload, 
:used, :used_status_last_updated)",
+              dbTables.getCompactionStatesTable()
+          );
 
-        String insertSql = StringUtils.format(
-            "INSERT INTO %s (created_date, datasource, fingerprint, payload, 
used, used_status_last_updated) "
-            + "VALUES (:created_date, :datasource, :fingerprint, :payload, 
:used, :used_status_last_updated)",
-            dbTables.getCompactionStatesTable()
-        );
+          try {
+            handle.createStatement(insertSql)
+                  .bind("created_date", now)
+                  .bind("dataSource", dataSource)
+                  .bind("fingerprint", fingerprint)
+                  .bind("payload", 
jsonMapper.writeValueAsBytes(compactionState))
+                  .bind("used", true)
+                  .bind("used_status_last_updated", now)
+                  .execute();
 
-        // Insert compaction states
-        PreparedBatch stateInsertBatch = handle.prepareBatch(insertSql);
-        for (List<String> partition : partitionedFingerprints) {
-          for (String fingerprint : partition) {
-            final String now = updateTime.toString();
-            try {
-              stateInsertBatch.add()
-                              .bind("created_date", now)
-                              .bind("datasource", dataSource)
-                              .bind("fingerprint", fingerprint)
-                              .bind("payload", 
jsonMapper.writeValueAsBytes(fingerprintToStateMap.get(fingerprint)))
-                              .bind("used", true)
-                              .bind("used_status_last_updated", now);
-            }
-            catch (JsonProcessingException e) {
-              throw InternalServerError.exception(
-                  e,
-                  "Failed to serialize compaction state for fingerprint[%s]",
-                  fingerprint
-              );
-            }
-          }
-          final int[] affectedRows = stateInsertBatch.execute();
-          final List<String> failedInserts = new ArrayList<>();
-          for (int i = 0; i < partition.size(); ++i) {
-            if (affectedRows[i] != 1) {
-              failedInserts.add(partition.get(i));
-            }
-          }
-          if (failedInserts.isEmpty()) {
             log.info(
-                "Published compaction states %s to DB for datasource[%s].",
-                partition,
+                "Published compaction state for fingerprint[%s] to DB for 
datasource[%s].",
+                fingerprint,
                 dataSource
             );
-          } else {
-            throw new ISE(
-                "Failed to publish compaction states[%s] to DB for 
datasource[%s]",
-                failedInserts,
-                dataSource
+          }
+          catch (JsonProcessingException e) {
+            throw InternalServerError.exception(
+                e,
+                "Failed to serialize compaction state for fingerprint[%s]",
+                fingerprint
             );
           }
         }
         return null;
       });
     }
-    finally {
-      lock.unlock();
+    catch (Exception e) {
+      if (isUniqueConstraintViolation(e)) {
+        log.info(

Review Comment:
   thanks for adding this!



##########
server/src/main/java/org/apache/druid/segment/metadata/DefaultCompactionFingerprintMapper.java:
##########
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.metadata;
+
+import com.google.inject.Inject;
+import org.apache.druid.guice.LazySingleton;
+import org.apache.druid.timeline.CompactionState;
+
+import javax.annotation.Nullable;
+import java.util.Optional;
+
+/**
+ * Default implementation of {@link CompactionFingerprintMapper} that 
delegates to
+ * {@link CompactionStateStorage} for fingerprint generation and {@link 
CompactionStateCache}
+ * for state lookups.
+ */
+@LazySingleton
+public class DefaultCompactionFingerprintMapper implements 
CompactionFingerprintMapper
+{
+  private final CompactionStateStorage compactionStateStorage;
+  private final CompactionStateCache compactionStateCache;
+
+  @Inject
+  public DefaultCompactionFingerprintMapper(
+      CompactionStateStorage compactionStateStorage,
+      @Nullable CompactionStateCache compactionStateCache
+  )
+  {
+    this.compactionStateStorage = compactionStateStorage;
+    this.compactionStateCache = compactionStateCache;
+  }
+
+  @Override
+  public String generateFingerprint(String dataSource, CompactionState 
compactionState)
+  {
+    return 
compactionStateStorage.generateCompactionStateFingerprint(compactionState, 
dataSource);
+  }
+
+  @Override
+  public Optional<CompactionState> getStateForFingerprint(String fingerprint)
+  {
+    if (compactionStateCache == null) {

Review Comment:
   The cache would be null only if incremental caching is disabled on the 
Overlord.
   In that case, should we just throw an exception here instead?



##########
server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java:
##########
@@ -280,7 +292,9 @@ private int submitCompactionTasks(
   public static ClientCompactionTaskQuery createCompactionTask(
       CompactionCandidate candidate,
       DataSourceCompactionConfig config,
-      CompactionEngine defaultEngine
+      CompactionEngine defaultEngine,
+      String compactionStateFingerprint,
+      boolean persistLastCompactionStateInSegments

Review Comment:
   Nit: to align with the config name
   ```suggestion
         boolean storeCompactionStatePerSegment
   ```



##########
server/src/main/java/org/apache/druid/segment/metadata/SqlCompactionStateStorage.java:
##########
@@ -303,62 +238,42 @@ public int deleteUnusedCompactionStatesOlderThan(long 
timestamp)
   }
 
   @Override
-  @SuppressWarnings("UnstableApiUsage")
   public String generateCompactionStateFingerprint(
       final CompactionState compactionState,
       final String dataSource
   )
   {
-    final Hasher hasher = Hashing.sha256().newHasher();
-
-    hasher.putBytes(StringUtils.toUtf8(dataSource));
-    hasher.putByte((byte) 0xff);
-
-    try {
-      hasher.putBytes(deterministicMapper.writeValueAsBytes(compactionState));
-    }
-    catch (JsonProcessingException e) {
-      throw new RuntimeException("Failed to serialize CompactionState for 
fingerprinting", e);
-    }
-    hasher.putByte((byte) 0xff);
-
-    return BaseEncoding.base16().encode(hasher.hash().asBytes());
+    return CompactionStateFingerprints.generate(compactionState, dataSource, 
deterministicMapper);
   }
 
 
   /**
-   * Query the metadata DB to filter the fingerprints that already exist.
-   **/
-  private Set<String> getExistingFingerprints(
+   * Checks if a fingerprint already exists in the metadata DB.
+   *
+   * @param handle Database handle
+   * @param fingerprintToCheck The fingerprint to check
+   * @return true if the fingerprint exists, false otherwise
+   */
+  private boolean isExistingFingerprint(
       final Handle handle,
-      final Set<String> fingerprintsToInsert
+      @Nonnull final String fingerprintToCheck
   )
   {
-    if (fingerprintsToInsert.isEmpty()) {
-      return Collections.emptySet();
+    if (fingerprintToCheck.isEmpty()) {
+      return false;
     }
 
-    List<List<String>> partitionedFingerprints = Lists.partition(
-        new ArrayList<>(fingerprintsToInsert),
-        DB_ACTION_PARTITION_SIZE
+    String sql = StringUtils.format(
+        "SELECT COUNT(*) FROM %s WHERE fingerprint = :fingerprint",

Review Comment:
   Nit: Why count(*)? We could just fire a `SELECT fingerprint, used FROM %s 
WHERE fingerprint = :fingerprint` and check if the result set is empty or not. 
An aggregation function might require some extra processing on the DB engine 
even if aggregating only a single row.



##########
indexing-service/src/main/java/org/apache/druid/indexing/compact/CompactionConfigBasedJobTemplate.java:
##########
@@ -72,9 +71,9 @@ public List<CompactionJob> createCompactionJobs(
 
     final List<CompactionJob> jobs = new ArrayList<>();
 
-    CompactionState compactionState = 
CompactionStatus.createCompactionStateFromConfig(config);
+    CompactionState compactionState = config.toCompactionState();
 
-    String compactionStateFingerprint = 
params.getCompactionStateManager().generateCompactionStateFingerprint(
+    String compactionStateFingerprint = 
params.getCompactionStateStorageImpl().generateCompactionStateFingerprint(

Review Comment:
   Use fingerprint mapper here instead.



##########
server/src/main/java/org/apache/druid/segment/metadata/DefaultCompactionFingerprintMapper.java:
##########
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.metadata;
+
+import com.google.inject.Inject;
+import org.apache.druid.guice.LazySingleton;
+import org.apache.druid.timeline.CompactionState;
+
+import javax.annotation.Nullable;
+import java.util.Optional;
+
+/**
+ * Default implementation of {@link CompactionFingerprintMapper} that 
delegates to
+ * {@link CompactionStateStorage} for fingerprint generation and {@link 
CompactionStateCache}
+ * for state lookups.
+ */
+@LazySingleton
+public class DefaultCompactionFingerprintMapper implements 
CompactionFingerprintMapper
+{
+  private final CompactionStateStorage compactionStateStorage;
+  private final CompactionStateCache compactionStateCache;
+
+  @Inject

Review Comment:
   I see `OverlordCompactionScheduler` creating an instance of this class 
explicitly. Does it need to be injected somewhere as well? I feel that for this 
class, it makes sense to always construct it explicitly rather than being 
injected.



##########
indexing-service/src/main/java/org/apache/druid/indexing/compact/CompactionJobQueue.java:
##########
@@ -357,14 +357,15 @@ private String startTaskIfReady(CompactionJob job)
   }
 
   /**
-   * Persist the compaction state associated with the given job with {@link 
CompactionStateManager}.
+   * Persist the compaction state associated with the given job with {@link 
CompactionStateStorage}.
    */
   private void persistPendingCompactionState(CompactionJob job)
   {
-    if (job.getCompactionState() != null && 
job.getCompactionStateFingerprint() != null) {
-      jobParams.getCompactionStateManager().persistCompactionState(
+    if (job.getTargetCompactionState() != null && 
job.getTargetCompactionStateFingerprint() != null) {
+      jobParams.getCompactionStateStorageImpl().upsertCompactionState(

Review Comment:
   I wonder if it would make sense for this compaction state and fingerprint to 
be upserted only when the corresponding segments are being committed.
   - The various task types that perform compaction already generate the 
`lastCompactionState` and pass it in the `DataSegment` objects that are 
committed via the task actions (`SegmentTransactionalInsertAction` / 
`SegmentTransactionalReplaceAction`).
   - Overlord just needs to decide whether to persist one compaction state, or 
separate compaction states for all segments, or both.
   
   If we persist the compaction state proactively before the actual compaction 
task has even launched and the compaction task takes long to finish, this 
fingerprint might be considered unreferenced and cleaned up (in case the clean 
up is too aggressive).
   
   Is there any advantage to persisting this compaction state proactively while 
creating the jobs itself?



##########
server/src/main/java/org/apache/druid/segment/metadata/DefaultCompactionFingerprintMapper.java:
##########
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.metadata;
+
+import com.google.inject.Inject;
+import org.apache.druid.guice.LazySingleton;
+import org.apache.druid.timeline.CompactionState;
+
+import javax.annotation.Nullable;
+import java.util.Optional;
+
+/**
+ * Default implementation of {@link CompactionFingerprintMapper} that 
delegates to
+ * {@link CompactionStateStorage} for fingerprint generation and {@link 
CompactionStateCache}
+ * for state lookups.
+ */
+@LazySingleton
+public class DefaultCompactionFingerprintMapper implements 
CompactionFingerprintMapper
+{
+  private final CompactionStateStorage compactionStateStorage;
+  private final CompactionStateCache compactionStateCache;
+
+  @Inject
+  public DefaultCompactionFingerprintMapper(
+      CompactionStateStorage compactionStateStorage,
+      @Nullable CompactionStateCache compactionStateCache
+  )
+  {
+    this.compactionStateStorage = compactionStateStorage;
+    this.compactionStateCache = compactionStateCache;
+  }
+
+  @Override
+  public String generateFingerprint(String dataSource, CompactionState 
compactionState)
+  {
+    return 
compactionStateStorage.generateCompactionStateFingerprint(compactionState, 
dataSource);

Review Comment:
   I wonder if we need to delegate to `CompactionStateStorage` anymore.
   Is it okay to just do the fingerprint generation here.
   
   It would allow us to remove the `generateFingerprint` method from 
`CompactionStateStorage`, as it doesn't really fit there.
   
   We might optionally also get rid of the utility class 
`CompactionStateFingerprints` since I think this would be the only usage of the 
generate fingerprint method. (tests could also construct a 
`DefaultCompactionFingerprintMapper`).



##########
server/src/main/java/org/apache/druid/segment/metadata/SqlCompactionStateStorage.java:
##########
@@ -94,140 +76,93 @@ public PersistedCompactionStateManager(
     this.connector = connector;
   }
 
-  @LifecycleStart
-  public void start()
-  {
-  }
-
-  @LifecycleStop
-  public void stop()
-  {
-  }
-
-  @VisibleForTesting
-  PersistedCompactionStateManager()
-  {
-    this.dbTables = null;
-    this.jsonMapper = null;
-    this.deterministicMapper = null;
-    this.connector = null;
-  }
-
   @Override
-  public void persistCompactionState(
+  public void upsertCompactionState(
       final String dataSource,
-      final Map<String, CompactionState> fingerprintToStateMap,
+      final String fingerprint,
+      final CompactionState compactionState,
       final DateTime updateTime
   )
   {
-    if (fingerprintToStateMap.isEmpty()) {
+    if (compactionState == null || fingerprint == null || 
fingerprint.isEmpty()) {
       return;
     }
 
-    final Lock lock = datasourceLocks.get(dataSource);
-    lock.lock();
     try {
       connector.retryWithHandle(handle -> {
-        // Fetch already existing compaction state fingerprints
-        final Set<String> existingFingerprints = getExistingFingerprints(
-            handle,
-            fingerprintToStateMap.keySet()
-        );
+        // Check if the fingerprint already exists
+        final boolean fingerprintExists = isExistingFingerprint(handle, 
fingerprint);
+        final String now = updateTime.toString();
 
-        if (!existingFingerprints.isEmpty()) {
+        if (fingerprintExists) {
+          // Fingerprint exists - update the used flag
           log.info(
-              "Found already existing compaction state in the DB for 
dataSource[%s]. Fingerprints: %s.",
-              dataSource,
-              existingFingerprints
+              "Found already existing compaction state in DB for 
fingerprint[%s] in dataSource[%s].",
+              fingerprint,
+              dataSource
           );
-          String setFingerprintsUsedSql = StringUtils.format(
+          String updateSql = StringUtils.format(
               "UPDATE %s SET used = :used, used_status_last_updated = 
:used_status_last_updated "
               + "WHERE fingerprint = :fingerprint",
               dbTables.getCompactionStatesTable()
           );
-          PreparedBatch markUsedBatch = 
handle.prepareBatch(setFingerprintsUsedSql);
-          for (String fingerprint : existingFingerprints) {
-            final String now = updateTime.toString();
-            markUsedBatch.add()
-                         .bind("used", true)
-                         .bind("used_status_last_updated", now)
-                         .bind("fingerprint", fingerprint);
-          }
-          markUsedBatch.execute();
-        }
-
-        Map<String, CompactionState> statesToPersist = new HashMap<>();
+          handle.createStatement(updateSql)
+                .bind("used", true)
+                .bind("used_status_last_updated", now)
+                .bind("fingerprint", fingerprint)
+                .execute();
 
-        for (Map.Entry<String, CompactionState> entry : 
fingerprintToStateMap.entrySet()) {
-          if (!existingFingerprints.contains(entry.getKey())) {
-            statesToPersist.put(entry.getKey(), entry.getValue());
-          }
-        }
+          log.info("Updated existing compaction state for datasource[%s].", 
dataSource);
+        } else {
 
-        if (statesToPersist.isEmpty()) {
-          log.info("No compaction state to persist for dataSource [%s].", 
dataSource);
-          return null;
-        }
+          // Fingerprint doesn't exist - insert new state
+          log.info("Inserting new compaction state for fingerprint[%s] in 
dataSource[%s].", fingerprint, dataSource);
 
-        final List<List<String>> partitionedFingerprints = Lists.partition(
-            new ArrayList<>(statesToPersist.keySet()),
-            DB_ACTION_PARTITION_SIZE
-        );
+          String insertSql = StringUtils.format(
+              "INSERT INTO %s (created_date, dataSource, fingerprint, payload, 
used, used_status_last_updated) "
+              + "VALUES (:created_date, :dataSource, :fingerprint, :payload, 
:used, :used_status_last_updated)",
+              dbTables.getCompactionStatesTable()
+          );
 
-        String insertSql = StringUtils.format(
-            "INSERT INTO %s (created_date, datasource, fingerprint, payload, 
used, used_status_last_updated) "
-            + "VALUES (:created_date, :datasource, :fingerprint, :payload, 
:used, :used_status_last_updated)",
-            dbTables.getCompactionStatesTable()
-        );
+          try {
+            handle.createStatement(insertSql)
+                  .bind("created_date", now)
+                  .bind("dataSource", dataSource)
+                  .bind("fingerprint", fingerprint)
+                  .bind("payload", 
jsonMapper.writeValueAsBytes(compactionState))
+                  .bind("used", true)
+                  .bind("used_status_last_updated", now)
+                  .execute();
 
-        // Insert compaction states
-        PreparedBatch stateInsertBatch = handle.prepareBatch(insertSql);
-        for (List<String> partition : partitionedFingerprints) {
-          for (String fingerprint : partition) {
-            final String now = updateTime.toString();
-            try {
-              stateInsertBatch.add()
-                              .bind("created_date", now)
-                              .bind("datasource", dataSource)
-                              .bind("fingerprint", fingerprint)
-                              .bind("payload", 
jsonMapper.writeValueAsBytes(fingerprintToStateMap.get(fingerprint)))
-                              .bind("used", true)
-                              .bind("used_status_last_updated", now);
-            }
-            catch (JsonProcessingException e) {
-              throw InternalServerError.exception(
-                  e,
-                  "Failed to serialize compaction state for fingerprint[%s]",
-                  fingerprint
-              );
-            }
-          }
-          final int[] affectedRows = stateInsertBatch.execute();
-          final List<String> failedInserts = new ArrayList<>();
-          for (int i = 0; i < partition.size(); ++i) {
-            if (affectedRows[i] != 1) {
-              failedInserts.add(partition.get(i));
-            }
-          }
-          if (failedInserts.isEmpty()) {
             log.info(
-                "Published compaction states %s to DB for datasource[%s].",
-                partition,
+                "Published compaction state for fingerprint[%s] to DB for 
datasource[%s].",
+                fingerprint,
                 dataSource
             );

Review Comment:
   Once upserted, should we also update this fingerprint in the 
`CompactionStateCache`?
   Otherwise, this fingerprint will not be reflected in the cache until the 
next sync, and the `CompactionJobQueue` might have another invocation in that 
time.



##########
indexing-service/src/main/java/org/apache/druid/indexing/compact/CompactionJobQueue.java:
##########
@@ -125,8 +125,8 @@ public CompactionJobQueue(
         clusterCompactionConfig,
         dataSourcesSnapshot.getUsedSegmentsTimelinesPerDataSource()::get,
         snapshotBuilder,
-        compactionStateManager,
-        compactionStateCache
+        fingerprintMapper,
+        compactionStateStorage

Review Comment:
   Instead of passing the storage to the job params, we should probably just 
keep a reference to it in the `CompactionJobQueue` class itself since we only 
do the persistence here.



##########
server/src/main/java/org/apache/druid/server/compaction/DataSourceCompactibleSegmentIterator.java:
##########
@@ -84,12 +88,16 @@ public DataSourceCompactibleSegmentIterator(
       DataSourceCompactionConfig config,
       SegmentTimeline timeline,
       List<Interval> skipIntervals,
-      CompactionCandidateSearchPolicy searchPolicy
+      CompactionCandidateSearchPolicy searchPolicy,
+      CompactionStateManager compactionStateManager,
+      CompactionStateCache compactionStateCache

Review Comment:
   It doesn't seem like the `CompactionConfigBasedJobTemplate` does an upsert 
anymore. (it does a generate though, which can just use the 
`CompactionFingerprintMapper` interface), so the current state of the 
`CompactionStateFingerprintMapper` should be good. 👍🏻 



##########
server/src/main/java/org/apache/druid/metadata/SQLMetadataConnector.java:
##########
@@ -1097,6 +1100,55 @@ public void createSegmentSchemasTable()
     }
   }
 
+  /**
+   * Creates the compaction states table for storing fingerprinted compaction 
states
+   * <p>
+   * This table stores unique compaction states that are referenced by
+   * segments via fingerprints.
+   */
+  public void createCompactionStatesTable(final String tableName)
+  {
+    createTable(
+        tableName,
+        ImmutableList.of(
+            StringUtils.format(
+                "CREATE TABLE %1$s (\n"
+                + "  id %2$s NOT NULL,\n"
+                + "  created_date VARCHAR(255) NOT NULL,\n"
+                + "  datasource VARCHAR(255) NOT NULL,\n"
+                + "  fingerprint VARCHAR(255) NOT NULL,\n"
+                + "  payload %3$s NOT NULL,\n"
+                + "  used BOOLEAN NOT NULL,\n"
+                + "  used_status_last_updated VARCHAR(255) NOT NULL,\n"
+                + "  PRIMARY KEY (id),\n"
+                + "  UNIQUE (fingerprint)\n"

Review Comment:
   Yeah, it shouldn't matter, just didn't like the idea of an extra column that 
is never going to be used.



##########
indexing-service/src/main/java/org/apache/druid/indexing/compact/CompactionJob.java:
##########
@@ -31,27 +32,37 @@ public class CompactionJob extends BatchIndexingJob
 {
   private final CompactionCandidate candidate;
   private final int maxRequiredTaskSlots;
+  private final String compactionStateFingerprint;
+  private final CompactionState compactionState;

Review Comment:
   >  since it indicates it being a destination we are headed to
   
   Yeah, this was the reason I suggested it. `compactionState` seemed a little 
ambiguous in this regard.



##########
server/src/test/java/org/apache/druid/segment/metadata/HeapMemoryCompactionStateManager.java:
##########
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.metadata;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.MapperFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.SerializationFeature;
+import com.google.common.hash.Hasher;
+import com.google.common.hash.Hashing;
+import com.google.common.io.BaseEncoding;
+import org.apache.druid.jackson.DefaultObjectMapper;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.timeline.CompactionState;
+import org.joda.time.DateTime;
+
+import javax.annotation.Nullable;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+/**
+ * In-memory implementation of {@link CompactionStateManager} that stores
+ * compaction state fingerprints in heap memory without requiring a database.
+ * <p>
+ * Useful for simulations and unit tests where database persistence is not 
needed.
+ * Database-specific operations (cleanup, unused marking) are no-ops in this 
implementation.
+ */
+public class HeapMemoryCompactionStateManager implements CompactionStateManager
+{
+  private final ConcurrentMap<String, CompactionState> fingerprintToStateMap = 
new ConcurrentHashMap<>();
+  private final ObjectMapper deterministicMapper;
+
+  /**
+   * Creates an in-memory compaction state manager with a default 
deterministic mapper.
+   * This is a convenience constructor for tests and simulations.
+   */
+  public HeapMemoryCompactionStateManager()
+  {
+    this(createDeterministicMapper());
+  }
+
+  /**
+   * Creates an in-memory compaction state manager with the provided 
deterministic mapper
+   * for fingerprint generation.
+   *
+   * @param deterministicMapper ObjectMapper configured for deterministic 
serialization
+   */
+  public HeapMemoryCompactionStateManager(ObjectMapper deterministicMapper)
+  {
+    this.deterministicMapper = deterministicMapper;
+  }
+
+  /**
+   * Creates an ObjectMapper configured for deterministic serialization.
+   * Used for generating consistent fingerprints.
+   */
+  private static ObjectMapper createDeterministicMapper()
+  {
+    ObjectMapper mapper = new DefaultObjectMapper();
+    mapper.configure(SerializationFeature.ORDER_MAP_ENTRIES_BY_KEYS, true);
+    mapper.configure(MapperFeature.SORT_PROPERTIES_ALPHABETICALLY, true);
+    return mapper;
+  }
+
+  @Override
+  @SuppressWarnings("UnstableApiUsage")
+  public String generateCompactionStateFingerprint(
+      final CompactionState compactionState,
+      final String dataSource
+  )
+  {
+    final Hasher hasher = Hashing.sha256().newHasher();

Review Comment:
   Yes, it makes sense to use the static utility or use the 
`DefaultFingerprintMapper` directly in tests.



##########
server/src/main/java/org/apache/druid/metadata/segment/cache/HeapMemorySegmentMetadataCache.java:
##########
@@ -1106,6 +1122,89 @@ private void emitMetric(String datasource, String 
metric, long value)
     );
   }
 
+  /**
+   * Retrieves required used compaction states from the metadata store and 
resets
+   * them in the {@link CompactionStateCache}. If this is the first sync, all 
used
+   * compaction states are retrieved from the metadata store. If this is a 
delta sync,
+   * first only the fingerprints of all used compaction states are retrieved. 
Payloads are
+   * then fetched for only the fingerprints which are not present in the cache.
+   */
+  private void retrieveAndResetUsedCompactionStates()
+  {
+    final Stopwatch compactionStateSyncDuration = Stopwatch.createStarted();
+
+    // Reset the CompactionStateCache with latest compaction states
+    final Map<String, CompactionState> fingerprintToStateMap;
+    if (syncFinishTime.get() == null) {
+      fingerprintToStateMap = buildFingerprintToStateMapForFullSync();
+    } else {
+      fingerprintToStateMap = buildFingerprintToStateMapForDeltaSync();
+    }
+
+    
compactionStateCache.resetCompactionStatesForPublishedSegments(fingerprintToStateMap);
+
+    // Emit metrics for the current contents of the cache
+    compactionStateCache.getStats().forEach(this::emitMetric);
+    emitMetric(Metric.RETRIEVE_COMPACTION_STATES_DURATION_MILLIS, 
compactionStateSyncDuration.millisElapsed());
+  }
+
+  /**
+   * Retrieves all used compaction states from the metadata store and builds a
+   * fresh map from compaction state fingerprint to state.
+   */
+  private Map<String, CompactionState> buildFingerprintToStateMapForFullSync()
+  {
+    final List<CompactionStateRecord> records = query(
+        SqlSegmentsMetadataQuery::retrieveAllUsedCompactionStates
+    );
+
+    return records.stream().collect(
+        Collectors.toMap(
+            CompactionStateRecord::getFingerprint,
+            CompactionStateRecord::getState
+        )
+    );
+  }
+
+  /**
+   * Retrieves compaction states from the metadata store if they are not 
present
+   * in the cache or have been recently updated in the metadata store. These
+   * compaction states along with those already present in the cache are used 
to
+   * build a complete updated map from compaction state fingerprint to state.
+   *
+   * @return Complete updated map from compaction state fingerprint to state 
for all
+   * used compaction states currently persisted in the metadata store.
+   */
+  private Map<String, CompactionState> buildFingerprintToStateMapForDeltaSync()
+  {
+    // Identify fingerprints in the cache and in the metadata store
+    final Map<String, CompactionState> fingerprintToStateMap = new HashMap<>(
+        compactionStateCache.getPublishedCompactionStateMap()
+    );
+    final Set<String> cachedFingerprints = 
Set.copyOf(fingerprintToStateMap.keySet());
+    final Set<String> persistedFingerprints = query(
+        SqlSegmentsMetadataQuery::retrieveAllUsedCompactionStateFingerprints
+    );
+
+    // Remove entry for compaction states that have been deleted from the 
metadata store
+    final Set<String> deletedFingerprints = 
Sets.difference(cachedFingerprints, persistedFingerprints);
+    deletedFingerprints.forEach(fingerprintToStateMap::remove);
+
+    // Retrieve and add entry for compaction states that have been added to 
the metadata store
+    final Set<String> addedFingerprints = 
Sets.difference(persistedFingerprints, cachedFingerprints);
+    final List<CompactionStateRecord> addedCompactionStateRecords = query(
+        sql -> sql.retrieveCompactionStatesForFingerprints(addedFingerprints)
+    );
+    if (addedCompactionStateRecords.size() < addedFingerprints.size()) {
+      emitMetric(Metric.SKIPPED_COMPACTION_STATES, addedFingerprints.size() - 
addedCompactionStateRecords.size());

Review Comment:
   I think just having the added and deleted metrics should suffice.



##########
server/src/test/java/org/apache/druid/segment/metadata/HeapMemoryCompactionStateStorage.java:
##########
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.metadata;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.MapperFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.SerializationFeature;
+import com.google.common.hash.Hasher;
+import com.google.common.hash.Hashing;
+import com.google.common.io.BaseEncoding;
+import org.apache.druid.jackson.DefaultObjectMapper;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.timeline.CompactionState;
+import org.joda.time.DateTime;
+
+import javax.annotation.Nullable;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+/**
+ * In-memory implementation of {@link CompactionStateManager} that stores
+ * compaction state fingerprints in heap memory without requiring a database.
+ * <p>
+ * Useful for simulations and unit tests where database persistence is not 
needed.
+ * Database-specific operations (cleanup, unused marking) are no-ops in this 
implementation.
+ */
+public class HeapMemoryCompactionStateManager implements CompactionStateManager
+{
+  private final ConcurrentMap<String, CompactionState> fingerprintToStateMap = 
new ConcurrentHashMap<>();
+  private final ObjectMapper deterministicMapper;
+
+  /**
+   * Creates an in-memory compaction state manager with a default 
deterministic mapper.
+   * This is a convenience constructor for tests and simulations.
+   */
+  public HeapMemoryCompactionStateManager()
+  {
+    this(createDeterministicMapper());
+  }
+
+  /**
+   * Creates an in-memory compaction state manager with the provided 
deterministic mapper
+   * for fingerprint generation.
+   *
+   * @param deterministicMapper ObjectMapper configured for deterministic 
serialization
+   */
+  public HeapMemoryCompactionStateManager(ObjectMapper deterministicMapper)
+  {
+    this.deterministicMapper = deterministicMapper;
+  }
+
+  /**
+   * Creates an ObjectMapper configured for deterministic serialization.
+   * Used for generating consistent fingerprints.
+   */
+  private static ObjectMapper createDeterministicMapper()

Review Comment:
   Yes, I was thinking about the `TestUtils` in `indexing-service`, didn't 
realize that it was in a downstream module though 😛 . We can leave this as-is 
for now.



##########
server/src/main/java/org/apache/druid/server/coordinator/DataSourceCompactionConfig.java:
##########
@@ -89,4 +98,59 @@ public interface DataSourceCompactionConfig
 
   @Nullable
   AggregatorFactory[] getMetricsSpec();
+
+  /**
+   * Converts this compaction config to a {@link CompactionState}.
+   */
+  default CompactionState toCompactionState()

Review Comment:
   Thanks for adding this!



##########
server/src/main/java/org/apache/druid/metadata/SQLMetadataConnector.java:
##########
@@ -1097,6 +1100,55 @@ public void createSegmentSchemasTable()
     }
   }
 
+  /**
+   * Creates the compaction states table for storing fingerprinted compaction 
states
+   * <p>
+   * This table stores unique compaction states that are referenced by
+   * segments via fingerprints.
+   */
+  public void createCompactionStatesTable(final String tableName)
+  {
+    createTable(
+        tableName,
+        ImmutableList.of(
+            StringUtils.format(
+                "CREATE TABLE %1$s (\n"
+                + "  id %2$s NOT NULL,\n"
+                + "  created_date VARCHAR(255) NOT NULL,\n"
+                + "  datasource VARCHAR(255) NOT NULL,\n"

Review Comment:
   Ah, I see. thanks!



##########
server/src/main/java/org/apache/druid/metadata/segment/cache/HeapMemorySegmentMetadataCache.java:
##########
@@ -785,13 +800,13 @@ private void retrieveAllUsedSegments(
     final String sql;
     if (useSchemaCache) {
       sql = StringUtils.format(
-          "SELECT id, payload, created_date, used_status_last_updated, 
schema_fingerprint, num_rows"
+          "SELECT id, payload, created_date, used_status_last_updated, 
compaction_state_fingerprint, schema_fingerprint, num_rows"

Review Comment:
   Sure, if it makes sense to you, I am okay with it too. 👍🏻 



##########
server/src/main/java/org/apache/druid/server/coordinator/DruidCompactionConfig.java:
##########
@@ -86,7 +86,8 @@ public DruidCompactionConfig(
       @JsonProperty("maxCompactionTaskSlots") @Nullable Integer 
maxCompactionTaskSlots,
       @JsonProperty("compactionPolicy") @Nullable 
CompactionCandidateSearchPolicy compactionPolicy,
       @JsonProperty("useSupervisors") @Nullable Boolean useSupervisors,
-      @JsonProperty("engine") @Nullable CompactionEngine engine
+      @JsonProperty("engine") @Nullable CompactionEngine engine,
+      @JsonProperty("legacyPersistLastCompactionStateInSegments") @Nullable 
Boolean legacyPersistLastCompactionStateInSegments

Review Comment:
   Please update the config name.



##########
docs/api-reference/automatic-compaction-api.md:
##########
@@ -889,6 +889,7 @@ This includes the following fields:
 |`compactionPolicy`|Policy to choose intervals for compaction. Currently, the 
only supported policy is [Newest segment 
first](#compaction-policy-newestsegmentfirst).|Newest segment first|
 |`useSupervisors`|Whether compaction should be run on Overlord using 
supervisors instead of Coordinator duties.|false|
 |`engine`|Engine used for running compaction tasks, unless overridden in the 
datasource-level compaction config. Possible values are `native` and `msq`. 
`msq` engine can be used for compaction only if `useSupervisors` is 
`true`.|`native`|
+|`legacyPersistLastCompactionStateInSegments`|Whether to persist the full 
compaction state in segment metadata. When `true` (default), compaction state 
is stored in both the segment metadata and the compaction states table. This is 
historically how Druid has worked. When `false`, only a fingerprint reference 
is stored in the segment metadata, reducing storage overhead in the segments 
table. The actual compaction state is stored in the compaction states table and 
can be referenced with the aforementioned fingerprint. Eventually this 
configuration will be removed and all compaction will use the fingerprint 
method only. This configuration exists for operators to opt into this future 
pattern early. **WARNING: if you set this to false and then compact data, 
rolling back to a Druid version that predates compaction state fingerprinting 
(< Druid 36) will result in missing compaction states and trigger compaction on 
segments that may already be compacted.**|`true`|

Review Comment:
   Please update the config name.



##########
server/src/main/java/org/apache/druid/server/compaction/CompactionStatus.java:
##########
@@ -351,40 +382,138 @@ private CompactionStatus evaluate()
         return inputBytesCheck;
       }
 
-      final List<String> reasonsForCompaction =
+      List<String> reasonsForCompaction = new ArrayList<>();
+      CompactionStatus compactedOnceCheck = 
segmentsHaveBeenCompactedAtLeastOnce();
+      if (!compactedOnceCheck.isComplete()) {
+        reasonsForCompaction.add(compactedOnceCheck.getReason());
+      }
+
+      if (fingerprintMapper != null && targetFingerprint != null) {
+        // First try fingerprint-based evaluation (fast path)
+        CompactionStatus fingerprintStatus = FINGERPRINT_CHECKS.stream()
+                                                               .map(f -> 
f.apply(this))
+                                                               .filter(status 
-> !status.isComplete())
+                                                               
.findFirst().orElse(COMPLETE);
+
+        if (!fingerprintStatus.isComplete()) {
+          reasonsForCompaction.add(fingerprintStatus.getReason());
+        }
+      }
+
+      reasonsForCompaction.addAll(
           CHECKS.stream()
                 .map(f -> f.apply(this))
                 .filter(status -> !status.isComplete())
                 .map(CompactionStatus::getReason)
-                .collect(Collectors.toList());
+                .collect(Collectors.toList())
+      );
 
       // Consider segments which have passed all checks to be compacted
-      final List<DataSegment> compactedSegments = unknownStateToSegments
-          .values()
-          .stream()
-          .flatMap(List::stream)
-          .collect(Collectors.toList());
+      // Includes segments with correct fingerprints and segments that passed 
all state checks
+      this.compactedSegments.addAll(
+          unknownStateToSegments
+              .values()
+              .stream()
+              .flatMap(List::stream)
+              .collect(Collectors.toList())
+      );
 
       if (reasonsForCompaction.isEmpty()) {
         return COMPLETE;
       } else {
         return CompactionStatus.pending(
-            createStats(compactedSegments),
+            createStats(this.compactedSegments),
             createStats(uncompactedSegments),
             reasonsForCompaction.get(0)
         );
       }
     }
 
+    /**
+     * Evaluates the fingerprints of all fingerprinted candidate segments 
against the expected fingerprint.
+     * <p>
+     * If all fingerprinted segments have the expected fingerprint, the check 
can quickly pass as COMPLETE. However,
+     * if any fingerprinted segment has a mismatched fingerprint, we need to 
investigate further by adding them to
+     * {@link #unknownStateToSegments} where their compaction states will be 
analyzed.
+     * </p>
+     */
+    private CompactionStatus 
allFingerprintedCandidatesHaveExpectedFingerprint()
+    {
+      Map<String, List<DataSegment>> mismatchedFingerprintToSegmentMap = new 
HashMap<>();
+      for (DataSegment segment : fingerprintedSegments) {
+        String fingerprint = segment.getCompactionStateFingerprint();
+        if (fingerprint == null) {
+          // Should not happen since we are iterating over 
fingerprintedSegments
+        } else if (fingerprint.equals(targetFingerprint)) {
+          compactedSegments.add(segment);
+        } else {
+          mismatchedFingerprintToSegmentMap
+              .computeIfAbsent(fingerprint, k -> new ArrayList<>())
+              .add(segment);
+        }
+      }
+
+      if (mismatchedFingerprintToSegmentMap.isEmpty()) {
+        // All fingerprinted segments have the expected fingerprint - 
compaction is complete
+        return COMPLETE;
+      }
+
+      if (fingerprintMapper == null) {
+        // Cannot evaluate further without a fingerprint mapper
+        uncompactedSegments.addAll(
+            mismatchedFingerprintToSegmentMap.values()
+                                            .stream()
+                                            .flatMap(List::stream)
+                                            .collect(Collectors.toList())
+        );
+        return CompactionStatus.pending("Segments have a mismatched 
fingerprint and no fingerprint mapper is available");
+      }
+
+      boolean fingerprintedSegmentWithoutCachedStateFound = false;
+
+      for (Map.Entry<String, List<DataSegment>> e : 
mismatchedFingerprintToSegmentMap.entrySet()) {
+        String fingerprint = e.getKey();
+        CompactionState stateToValidate = 
fingerprintMapper.getStateForFingerprint(fingerprint).orElse(null);
+        if (stateToValidate == null) {
+          log.warn("No compaction state found for fingerprint[%s]", 
fingerprint);
+          fingerprintedSegmentWithoutCachedStateFound = true;
+          uncompactedSegments.addAll(e.getValue());
+        } else {
+          // Note that this does not mean we need compaction yet - we need to 
validate the state further to determine this
+          unknownStateToSegments.compute(
+              stateToValidate,
+              (state, segments) -> {
+                if (segments == null) {
+                  segments = new ArrayList<>();
+                }
+                segments.addAll(e.getValue());
+                return segments;
+              });
+        }
+      }
+
+      if (fingerprintedSegmentWithoutCachedStateFound) {
+        return CompactionStatus.pending("One or more fingerprinted segments do 
not have a cached compaction state");

Review Comment:
   Good catch! I just assumed that if a segment has a fingerprint, the 
corresponding state would always be available in the cache. But I guess there 
can be error scenarios where this happens. Should we also include a WARN log 
here since this shouldn't happen under normal conditions?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to