capistrant commented on code in PR #18844:
URL: https://github.com/apache/druid/pull/18844#discussion_r2659821441


##########
services/src/main/java/org/apache/druid/guice/MetadataManagerModule.java:
##########
@@ -123,6 +132,15 @@ public void configure(Binder binder)
             .in(LazySingleton.class);
     }
 
+    // Overlord-only compaction state dependencies

Review Comment:
   nit: maybe move this into one conditional for overlord only with stuff below?



##########
server/src/test/java/org/apache/druid/metadata/segment/cache/HeapMemorySegmentMetadataCacheTest.java:
##########
@@ -111,19 +114,22 @@ private void 
setupTargetWithCaching(SegmentMetadataCache.UsageMode cacheMode)
   /**
    * Creates the target {@link #cache} to be tested in the current test.
    */
-  private void setupTargetWithCaching(SegmentMetadataCache.UsageMode 
cacheMode, boolean useSchemaCache)
+  private void setupTargetWithCaching(SegmentMetadataCache.UsageMode 
cacheMode, boolean useSchemaCache
+  )

Review Comment:
   nit: no newline



##########
server/src/main/java/org/apache/druid/segment/metadata/CompactionStateManager.java:
##########
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.metadata;
+
+import org.apache.druid.timeline.CompactionState;
+import org.joda.time.DateTime;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Manages compaction state persistence and fingerprint generation.
+ * <p>
+ * Implementations may be backed by a database (like {@link 
PersistedCompactionStateManager}) or
+ * use in-memory storage (like {@link HeapMemoryCompactionStateManager}).
+ */
+public interface CompactionStateManager
+{
+  /**
+   * Generates a deterministic fingerprint for the given compaction state and 
datasource.
+   * The fingerprint is a SHA-256 hash of the datasource name and serialized 
compaction state.
+   *
+   * @param compactionState The compaction configuration to fingerprint
+   * @param dataSource The datasource name
+   * @return A hex-encoded SHA-256 fingerprint string
+   */
+  String generateCompactionStateFingerprint(CompactionState compactionState, 
String dataSource);
+
+  /**
+   * Persists compaction states to storage.
+   *
+   * @param dataSource The datasource name
+   * @param fingerprintToStateMap Map of fingerprints to their compaction 
states
+   * @param updateTime The timestamp for this update
+   */
+  void persistCompactionState(

Review Comment:
   ya, this is over-engineered for batching. I will simplify.



##########
server/src/main/java/org/apache/druid/segment/metadata/PersistedCompactionStateManager.java:
##########
@@ -0,0 +1,401 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.metadata;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Lists;
+import com.google.common.hash.Hasher;
+import com.google.common.hash.Hashing;
+import com.google.common.io.BaseEncoding;
+import com.google.common.util.concurrent.Striped;
+import com.google.inject.Inject;
+import org.apache.druid.error.InternalServerError;
+import org.apache.druid.guice.ManageLifecycle;
+import org.apache.druid.guice.annotations.Deterministic;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
+import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
+import org.apache.druid.java.util.emitter.EmittingLogger;
+import org.apache.druid.metadata.MetadataStorageTablesConfig;
+import org.apache.druid.metadata.SQLMetadataConnector;
+import org.apache.druid.timeline.CompactionState;
+import org.joda.time.DateTime;
+import org.skife.jdbi.v2.Handle;
+import org.skife.jdbi.v2.PreparedBatch;
+import org.skife.jdbi.v2.Query;
+import org.skife.jdbi.v2.SQLStatement;
+import org.skife.jdbi.v2.Update;
+
+import javax.annotation.Nonnull;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.locks.Lock;
+
+/**
+ * Database-backed implementation of {@link CompactionStateManager}.
+ * <p>
+ * Manages the persistence and retrieval of {@link CompactionState} objects in 
the metadata storage.
+ * Compaction states are uniquely identified by their fingerprints, which are 
SHA-256 hashes of their content. A cache
+ * of compaction states using the fingerprints as keys is maintained in memory 
to optimize retrieval performance.
+ * </p>
+ * <p>
+ * A striped locking mechanism is used to ensure thread-safe persistence of 
compaction states on a per-datasource basis.
+ * </p>
+ */
+@ManageLifecycle
+public class PersistedCompactionStateManager implements CompactionStateManager
+{
+  private static final EmittingLogger log = new 
EmittingLogger(PersistedCompactionStateManager.class);
+  private static final int DB_ACTION_PARTITION_SIZE = 100;
+
+  private final MetadataStorageTablesConfig dbTables;
+  private final ObjectMapper jsonMapper;
+  private final ObjectMapper deterministicMapper;
+  private final SQLMetadataConnector connector;
+  private final Striped<Lock> datasourceLocks = Striped.lock(128);
+
+  @Inject
+  public PersistedCompactionStateManager(
+      @Nonnull MetadataStorageTablesConfig dbTables,
+      @Nonnull ObjectMapper jsonMapper,
+      @Deterministic @Nonnull ObjectMapper deterministicMapper,
+      @Nonnull SQLMetadataConnector connector
+  )
+  {
+    this.dbTables = dbTables;
+    this.jsonMapper = jsonMapper;
+    this.deterministicMapper = deterministicMapper;
+    this.connector = connector;
+  }
+
+  @LifecycleStart
+  public void start()
+  {
+  }
+
+  @LifecycleStop
+  public void stop()
+  {
+  }
+
+  @VisibleForTesting
+  PersistedCompactionStateManager()

Review Comment:
   I think I actually just forgot to delete this after removing its usage. I 
was showing as unused in my IDE. So yes, I will be nuking.



##########
server/src/test/java/org/apache/druid/segment/metadata/HeapMemoryCompactionStateManager.java:
##########
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.metadata;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.MapperFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.SerializationFeature;
+import com.google.common.hash.Hasher;
+import com.google.common.hash.Hashing;
+import com.google.common.io.BaseEncoding;
+import org.apache.druid.jackson.DefaultObjectMapper;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.timeline.CompactionState;
+import org.joda.time.DateTime;
+
+import javax.annotation.Nullable;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+/**
+ * In-memory implementation of {@link CompactionStateManager} that stores
+ * compaction state fingerprints in heap memory without requiring a database.
+ * <p>
+ * Useful for simulations and unit tests where database persistence is not 
needed.
+ * Database-specific operations (cleanup, unused marking) are no-ops in this 
implementation.
+ */
+public class HeapMemoryCompactionStateManager implements CompactionStateManager
+{
+  private final ConcurrentMap<String, CompactionState> fingerprintToStateMap = 
new ConcurrentHashMap<>();
+  private final ObjectMapper deterministicMapper;
+
+  /**
+   * Creates an in-memory compaction state manager with a default 
deterministic mapper.
+   * This is a convenience constructor for tests and simulations.
+   */
+  public HeapMemoryCompactionStateManager()
+  {
+    this(createDeterministicMapper());
+  }
+
+  /**
+   * Creates an in-memory compaction state manager with the provided 
deterministic mapper
+   * for fingerprint generation.
+   *
+   * @param deterministicMapper ObjectMapper configured for deterministic 
serialization
+   */
+  public HeapMemoryCompactionStateManager(ObjectMapper deterministicMapper)
+  {
+    this.deterministicMapper = deterministicMapper;
+  }
+
+  /**
+   * Creates an ObjectMapper configured for deterministic serialization.
+   * Used for generating consistent fingerprints.
+   */
+  private static ObjectMapper createDeterministicMapper()
+  {
+    ObjectMapper mapper = new DefaultObjectMapper();
+    mapper.configure(SerializationFeature.ORDER_MAP_ENTRIES_BY_KEYS, true);
+    mapper.configure(MapperFeature.SORT_PROPERTIES_ALPHABETICALLY, true);
+    return mapper;
+  }
+
+  @Override
+  @SuppressWarnings("UnstableApiUsage")
+  public String generateCompactionStateFingerprint(
+      final CompactionState compactionState,
+      final String dataSource
+  )
+  {
+    final Hasher hasher = Hashing.sha256().newHasher();

Review Comment:
   yes, good point. status quo is not great. weighed some options including 
these and ended up going with compaction state fingerprint utility class w/ 
private empty constructor and this static utility open for use. Something about 
that appealed to me more with both impls calling into it instead of the heap 
impl calling into the sql manager. 



##########
indexing-service/src/main/java/org/apache/druid/indexing/compact/CompactionJob.java:
##########
@@ -31,27 +32,37 @@ public class CompactionJob extends BatchIndexingJob
 {
   private final CompactionCandidate candidate;
   private final int maxRequiredTaskSlots;
+  private final String compactionStateFingerprint;
+  private final CompactionState compactionState;

Review Comment:
   hmm. I don't feel strongly either way. I don't think either is right or 
wrong, but will go with yours since it indicates it being a destination we are 
headed to



##########
website/.spelling:
##########
@@ -483,6 +483,7 @@ pre-computation
 pre-compute
 pre-computed
 pre-computing
+pre-warms

Review Comment:
   nit: this might be removed now



##########
server/src/test/java/org/apache/druid/server/coordinator/duty/KillUnreferencedCompactionStateTest.java:
##########
@@ -0,0 +1,272 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.coordinator.duty;
+
+import com.fasterxml.jackson.databind.MapperFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.SerializationFeature;
+import org.apache.druid.indexer.partitions.DynamicPartitionsSpec;
+import org.apache.druid.jackson.DefaultObjectMapper;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.metadata.MetadataStorageTablesConfig;
+import org.apache.druid.metadata.TestDerbyConnector;
+import org.apache.druid.segment.IndexSpec;
+import org.apache.druid.segment.metadata.CompactionStateManager;
+import org.apache.druid.segment.metadata.PersistedCompactionStateManager;
+import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams;
+import org.apache.druid.server.coordinator.config.MetadataCleanupConfig;
+import org.apache.druid.server.coordinator.stats.CoordinatorRunStats;
+import org.apache.druid.timeline.CompactionState;
+import org.easymock.EasyMock;
+import org.joda.time.DateTime;
+import org.joda.time.Period;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+
+public class KillUnreferencedCompactionStateTest
+{
+  @RegisterExtension
+  public static final TestDerbyConnector.DerbyConnectorRule5 
DERBY_CONNECTOR_RULE =
+      new TestDerbyConnector.DerbyConnectorRule5();
+
+  private final ObjectMapper jsonMapper = new DefaultObjectMapper();
+
+  private TestDerbyConnector derbyConnector;
+  private MetadataStorageTablesConfig tablesConfig;
+  private CompactionStateManager compactionStateManager;
+  private DruidCoordinatorRuntimeParams mockParams;
+
+  @BeforeEach
+  public void setUp()
+  {
+    derbyConnector = DERBY_CONNECTOR_RULE.getConnector();
+    tablesConfig = DERBY_CONNECTOR_RULE.metadataTablesConfigSupplier().get();
+
+    derbyConnector.createCompactionStatesTable();
+    derbyConnector.createSegmentTable();
+
+    compactionStateManager = new PersistedCompactionStateManager(tablesConfig, 
jsonMapper, createDeterministicMapper(), derbyConnector);
+
+    mockParams = EasyMock.createMock(DruidCoordinatorRuntimeParams.class);
+    CoordinatorRunStats runStats = new CoordinatorRunStats();
+    
EasyMock.expect(mockParams.getCoordinatorStats()).andReturn(runStats).anyTimes();
+    EasyMock.replay(mockParams);
+  }
+
+  @Test
+  public void testKillUnreferencedCompactionState_lifecycle()
+  {
+    // Setup time progression: now, +1hr, +7hrs (past cleanup period and 
retention)
+    List<DateTime> dateTimes = new ArrayList<>();
+    DateTime now = DateTimes.nowUtc();
+    dateTimes.add(now);                         // Run 1: Mark as unused
+    dateTimes.add(now.plusMinutes(61));         // Run 2: Still in retention 
period
+    dateTimes.add(now.plusMinutes(6 * 60 + 1)); // Run 3: Past retention, 
delete
+
+    MetadataCleanupConfig cleanupConfig = new MetadataCleanupConfig(
+        true,
+        Period.parse("PT1H").toStandardDuration(),  // cleanup period
+        Period.parse("PT6H").toStandardDuration()   // retention duration
+    );
+
+    KillUnreferencedCompactionState duty =
+        new TestKillUnreferencedCompactionState(cleanupConfig, 
compactionStateManager, dateTimes);
+
+    // Insert a compaction state (initially marked as used)
+    String fingerprint = "test_fingerprint";
+    CompactionState state = createTestCompactionState();
+
+    derbyConnector.retryWithHandle(handle -> {
+      Map<String, CompactionState> map = new HashMap<>();
+      map.put(fingerprint, state);
+      compactionStateManager.persistCompactionState("test-ds", map, 
DateTimes.nowUtc());
+      return null;
+    });
+
+    assertEquals(Boolean.TRUE, getCompactionStateUsedStatus(fingerprint));
+
+    // Run 1: Should mark as unused (no segments reference it)
+    duty.run(mockParams);
+    assertEquals(Boolean.FALSE, getCompactionStateUsedStatus(fingerprint));
+
+    // Run 2: Still unused, but within retention period - should not delete
+    duty.run(mockParams);
+    assertNotNull(getCompactionStateUsedStatus(fingerprint));
+
+    // Run 3: Past retention period - should delete
+    duty.run(mockParams);
+    assertNull(getCompactionStateUsedStatus(fingerprint));
+  }
+
+  @Test
+  public void testKillUnreferencedCompactionState_repair()
+  {
+    List<DateTime> dateTimes = new ArrayList<>();
+    DateTime now = DateTimes.nowUtc();
+    dateTimes.add(now);
+    dateTimes.add(now.plusMinutes(61));
+
+    MetadataCleanupConfig cleanupConfig = new MetadataCleanupConfig(
+        true,
+        Period.parse("PT1H").toStandardDuration(),
+        Period.parse("PT6H").toStandardDuration()
+    );
+
+    KillUnreferencedCompactionState duty =
+        new TestKillUnreferencedCompactionState(cleanupConfig, 
compactionStateManager, dateTimes);
+
+    // Insert compaction state
+    String fingerprint = "repair_fingerprint";
+    CompactionState state = createTestCompactionState();
+
+    derbyConnector.retryWithHandle(handle -> {
+      Map<String, CompactionState> map = new HashMap<>();
+      map.put(fingerprint, state);
+      compactionStateManager.persistCompactionState("test-ds", map, 
DateTimes.nowUtc());
+      return null;
+    });
+
+    // Run 1: Mark as unused
+    duty.run(mockParams);
+    assertEquals(Boolean.FALSE, getCompactionStateUsedStatus(fingerprint));
+
+    // Now insert a used segment that references this fingerprint
+    derbyConnector.retryWithHandle(handle -> {
+      handle.createStatement(
+                "INSERT INTO " + tablesConfig.getSegmentsTable() + " "
+                + "(id, dataSource, created_date, start, \"end\", partitioned, 
version, used, payload, "
+                + "used_status_last_updated, compaction_state_fingerprint) "
+                + "VALUES (:id, :dataSource, :created_date, :start, :end, 
:partitioned, :version, :used, :payload, "
+                + ":used_status_last_updated, :compaction_state_fingerprint)"
+            )
+            .bind("id", "testSegment_2024-01-01_2024-01-02_v1_0")
+            .bind("dataSource", "test-ds")
+            .bind("created_date", DateTimes.nowUtc().toString())
+            .bind("start", "2024-01-01T00:00:00.000Z")
+            .bind("end", "2024-01-02T00:00:00.000Z")
+            .bind("partitioned", 0)
+            .bind("version", "v1")
+            .bind("used", true)
+            .bind("payload", new byte[]{})
+            .bind("used_status_last_updated", DateTimes.nowUtc().toString())
+            .bind("compaction_state_fingerprint", fingerprint)
+            .execute();
+      return null;
+    });
+
+    // Run 2: Repair - should mark it back as used
+    duty.run(mockParams);
+    assertEquals(Boolean.TRUE, getCompactionStateUsedStatus(fingerprint));
+  }
+
+  @Test
+  public void testKillUnreferencedCompactionState_disabled()
+  {
+    MetadataCleanupConfig cleanupConfig = new MetadataCleanupConfig(
+        false, // disabled
+        Period.parse("PT1H").toStandardDuration(),
+        Period.parse("PT6H").toStandardDuration()
+    );
+
+    KillUnreferencedCompactionState duty =
+        new KillUnreferencedCompactionState(cleanupConfig, 
compactionStateManager);
+
+    // Insert compaction state
+    String fingerprint = "disabled_fingerprint";
+    derbyConnector.retryWithHandle(handle -> {
+      Map<String, CompactionState> map = new HashMap<>();
+      map.put(fingerprint, createTestCompactionState());
+      compactionStateManager.persistCompactionState("test-ds", map, 
DateTimes.nowUtc());
+      return null;
+    });
+
+    // Run duty - should do nothing
+    duty.run(mockParams);
+
+    // Should still be used (not marked as unused)
+    assertEquals(Boolean.TRUE, getCompactionStateUsedStatus(fingerprint));
+  }
+
+  private static class TestKillUnreferencedCompactionState extends 
KillUnreferencedCompactionState
+  {
+    private final List<DateTime> dateTimes;
+    private int index = -1;
+
+    public TestKillUnreferencedCompactionState(
+        MetadataCleanupConfig config,
+        CompactionStateManager compactionStateManager,
+        List<DateTime> dateTimes
+    )
+    {
+      super(config, compactionStateManager);
+      this.dateTimes = dateTimes;
+    }
+
+    @Override
+    protected DateTime getCurrentTime()
+    {
+      index++;
+      return dateTimes.get(index);
+    }
+  }
+
+  private CompactionState createTestCompactionState()
+  {
+    return new CompactionState(
+        new DynamicPartitionsSpec(100, null),
+        null, null, null,
+        IndexSpec.getDefault(),
+        null, null
+    );
+  }
+
+  private Boolean getCompactionStateUsedStatus(String fingerprint)
+  {
+    List<Boolean> usedStatus = derbyConnector.retryWithHandle(
+        handle -> handle.createQuery(
+                            "SELECT used FROM " + 
tablesConfig.getCompactionStatesTable()
+                            + " WHERE fingerprint = :fp"
+                        )
+                        .bind("fp", fingerprint)
+                        .mapTo(Boolean.class)
+                        .list()
+    );
+
+    return usedStatus.isEmpty() ? null : usedStatus.get(0);
+  }
+
+  private static ObjectMapper createDeterministicMapper()

Review Comment:
   does this need to react to the new way we are creating the mapper?



##########
server/src/main/java/org/apache/druid/metadata/segment/cache/HeapMemorySegmentMetadataCache.java:
##########
@@ -785,13 +800,13 @@ private void retrieveAllUsedSegments(
     final String sql;
     if (useSchemaCache) {
       sql = StringUtils.format(
-          "SELECT id, payload, created_date, used_status_last_updated, 
schema_fingerprint, num_rows"
+          "SELECT id, payload, created_date, used_status_last_updated, 
compaction_state_fingerprint, schema_fingerprint, num_rows"

Review Comment:
   Hm. I intentionally did this to make all of the indexes static. Putting 
compaction_state_fingerprint at the end would result in its own index being 
dynamic which felt worse than a re-shuffle. Thoughts?



##########
server/src/main/java/org/apache/druid/metadata/SQLMetadataConnector.java:
##########
@@ -1097,6 +1100,55 @@ public void createSegmentSchemasTable()
     }
   }
 
+  /**
+   * Creates the compaction states table for storing fingerprinted compaction 
states
+   * <p>
+   * This table stores unique compaction states that are referenced by
+   * segments via fingerprints.
+   */
+  public void createCompactionStatesTable(final String tableName)
+  {
+    createTable(
+        tableName,
+        ImmutableList.of(
+            StringUtils.format(
+                "CREATE TABLE %1$s (\n"
+                + "  id %2$s NOT NULL,\n"
+                + "  created_date VARCHAR(255) NOT NULL,\n"
+                + "  datasource VARCHAR(255) NOT NULL,\n"

Review Comment:
   unfortunately it looks like it is mixed across tables between datasource and 
dataSource. But since the oldest druid tables have camel, I will indeed change 
for compactionStates table



##########
server/src/main/java/org/apache/druid/segment/metadata/CompactionStateManager.java:
##########
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.metadata;
+
+import org.apache.druid.timeline.CompactionState;
+import org.joda.time.DateTime;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Manages compaction state persistence and fingerprint generation.
+ * <p>
+ * Implementations may be backed by a database (like {@link 
PersistedCompactionStateManager}) or
+ * use in-memory storage (like {@link HeapMemoryCompactionStateManager}).
+ */
+public interface CompactionStateManager
+{
+  /**
+   * Generates a deterministic fingerprint for the given compaction state and 
datasource.
+   * The fingerprint is a SHA-256 hash of the datasource name and serialized 
compaction state.
+   *
+   * @param compactionState The compaction configuration to fingerprint
+   * @param dataSource The datasource name
+   * @return A hex-encoded SHA-256 fingerprint string
+   */
+  String generateCompactionStateFingerprint(CompactionState compactionState, 
String dataSource);
+
+  /**
+   * Persists compaction states to storage.
+   *
+   * @param dataSource The datasource name
+   * @param fingerprintToStateMap Map of fingerprints to their compaction 
states
+   * @param updateTime The timestamp for this update
+   */
+  void persistCompactionState(
+      String dataSource,
+      Map<String, CompactionState> fingerprintToStateMap,
+      DateTime updateTime
+  );
+
+  /**
+   * Marks compaction states as unused if they are not referenced by any used 
segments.
+   * This is used for cleanup operations. Implementations may choose to no-op 
this.

Review Comment:
   Fair point, I'll remove default impl and re-word doc.



##########
server/src/main/java/org/apache/druid/segment/metadata/PersistedCompactionStateManager.java:
##########
@@ -0,0 +1,401 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.metadata;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Lists;
+import com.google.common.hash.Hasher;
+import com.google.common.hash.Hashing;
+import com.google.common.io.BaseEncoding;
+import com.google.common.util.concurrent.Striped;
+import com.google.inject.Inject;
+import org.apache.druid.error.InternalServerError;
+import org.apache.druid.guice.ManageLifecycle;
+import org.apache.druid.guice.annotations.Deterministic;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
+import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
+import org.apache.druid.java.util.emitter.EmittingLogger;
+import org.apache.druid.metadata.MetadataStorageTablesConfig;
+import org.apache.druid.metadata.SQLMetadataConnector;
+import org.apache.druid.timeline.CompactionState;
+import org.joda.time.DateTime;
+import org.skife.jdbi.v2.Handle;
+import org.skife.jdbi.v2.PreparedBatch;
+import org.skife.jdbi.v2.Query;
+import org.skife.jdbi.v2.SQLStatement;
+import org.skife.jdbi.v2.Update;
+
+import javax.annotation.Nonnull;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.locks.Lock;
+
+/**
+ * Database-backed implementation of {@link CompactionStateManager}.
+ * <p>
+ * Manages the persistence and retrieval of {@link CompactionState} objects in 
the metadata storage.
+ * Compaction states are uniquely identified by their fingerprints, which are 
SHA-256 hashes of their content. A cache
+ * of compaction states using the fingerprints as keys is maintained in memory 
to optimize retrieval performance.
+ * </p>
+ * <p>
+ * A striped locking mechanism is used to ensure thread-safe persistence of 
compaction states on a per-datasource basis.
+ * </p>
+ */
+@ManageLifecycle
+public class PersistedCompactionStateManager implements CompactionStateManager
+{
+  private static final EmittingLogger log = new 
EmittingLogger(PersistedCompactionStateManager.class);
+  private static final int DB_ACTION_PARTITION_SIZE = 100;
+
+  private final MetadataStorageTablesConfig dbTables;
+  private final ObjectMapper jsonMapper;
+  private final ObjectMapper deterministicMapper;
+  private final SQLMetadataConnector connector;
+  private final Striped<Lock> datasourceLocks = Striped.lock(128);
+
+  @Inject
+  public PersistedCompactionStateManager(
+      @Nonnull MetadataStorageTablesConfig dbTables,
+      @Nonnull ObjectMapper jsonMapper,
+      @Deterministic @Nonnull ObjectMapper deterministicMapper,
+      @Nonnull SQLMetadataConnector connector
+  )
+  {
+    this.dbTables = dbTables;
+    this.jsonMapper = jsonMapper;
+    this.deterministicMapper = deterministicMapper;
+    this.connector = connector;
+  }
+
+  @LifecycleStart
+  public void start()
+  {
+  }
+
+  @LifecycleStop
+  public void stop()
+  {
+  }
+
+  @VisibleForTesting
+  PersistedCompactionStateManager()
+  {
+    this.dbTables = null;
+    this.jsonMapper = null;
+    this.deterministicMapper = null;
+    this.connector = null;
+  }
+
+  @Override
+  public void persistCompactionState(
+      final String dataSource,
+      final Map<String, CompactionState> fingerprintToStateMap,
+      final DateTime updateTime
+  )
+  {
+    if (fingerprintToStateMap.isEmpty()) {
+      return;
+    }
+
+    final Lock lock = datasourceLocks.get(dataSource);

Review Comment:
   yes the intention was to provide protection against unique constraint 
exceptions. I agree that it is over-engineered for the system as it currently 
is. When I was weighing alternative of trying to catch and ignore the unique 
constraint exceptions I didn't like how it looked hacky. I will revert back to 
that though as I get that this added complexity is probably worse.



##########
server/src/test/java/org/apache/druid/segment/metadata/HeapMemoryCompactionStateStorage.java:
##########
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.metadata;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.MapperFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.SerializationFeature;
+import com.google.common.hash.Hasher;
+import com.google.common.hash.Hashing;
+import com.google.common.io.BaseEncoding;
+import org.apache.druid.jackson.DefaultObjectMapper;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.timeline.CompactionState;
+import org.joda.time.DateTime;
+
+import javax.annotation.Nullable;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+/**
+ * In-memory implementation of {@link CompactionStateManager} that stores
+ * compaction state fingerprints in heap memory without requiring a database.
+ * <p>
+ * Useful for simulations and unit tests where database persistence is not 
needed.
+ * Database-specific operations (cleanup, unused marking) are no-ops in this 
implementation.
+ */
+public class HeapMemoryCompactionStateManager implements CompactionStateManager
+{
+  private final ConcurrentMap<String, CompactionState> fingerprintToStateMap = 
new ConcurrentHashMap<>();
+  private final ObjectMapper deterministicMapper;
+
+  /**
+   * Creates an in-memory compaction state manager with a default 
deterministic mapper.
+   * This is a convenience constructor for tests and simulations.
+   */
+  public HeapMemoryCompactionStateManager()
+  {
+    this(createDeterministicMapper());
+  }
+
+  /**
+   * Creates an in-memory compaction state manager with the provided 
deterministic mapper
+   * for fingerprint generation.
+   *
+   * @param deterministicMapper ObjectMapper configured for deterministic 
serialization
+   */
+  public HeapMemoryCompactionStateManager(ObjectMapper deterministicMapper)
+  {
+    this.deterministicMapper = deterministicMapper;
+  }
+
+  /**
+   * Creates an ObjectMapper configured for deterministic serialization.
+   * Used for generating consistent fingerprints.
+   */
+  private static ObjectMapper createDeterministicMapper()

Review Comment:
   not clear to me where you are indicating. I see TestUtils in 
indexing-service module which isn't visible from here. Plus there is TestUtils 
in processing module that is visible, but that is for cgroups metrics stuff and 
feels like not what you are indicating.



##########
server/src/main/java/org/apache/druid/metadata/segment/cache/HeapMemorySegmentMetadataCache.java:
##########
@@ -1106,6 +1122,89 @@ private void emitMetric(String datasource, String 
metric, long value)
     );
   }
 
+  /**
+   * Retrieves required used compaction states from the metadata store and 
resets
+   * them in the {@link CompactionStateCache}. If this is the first sync, all 
used
+   * compaction states are retrieved from the metadata store. If this is a 
delta sync,
+   * first only the fingerprints of all used compaction states are retrieved. 
Payloads are
+   * then fetched for only the fingerprints which are not present in the cache.
+   */
+  private void retrieveAndResetUsedCompactionStates()
+  {
+    final Stopwatch compactionStateSyncDuration = Stopwatch.createStarted();
+
+    // Reset the CompactionStateCache with latest compaction states
+    final Map<String, CompactionState> fingerprintToStateMap;
+    if (syncFinishTime.get() == null) {
+      fingerprintToStateMap = buildFingerprintToStateMapForFullSync();
+    } else {
+      fingerprintToStateMap = buildFingerprintToStateMapForDeltaSync();
+    }
+
+    
compactionStateCache.resetCompactionStatesForPublishedSegments(fingerprintToStateMap);
+
+    // Emit metrics for the current contents of the cache
+    compactionStateCache.getStats().forEach(this::emitMetric);
+    emitMetric(Metric.RETRIEVE_COMPACTION_STATES_DURATION_MILLIS, 
compactionStateSyncDuration.millisElapsed());
+  }
+
+  /**
+   * Retrieves all used compaction states from the metadata store and builds a
+   * fresh map from compaction state fingerprint to state.
+   */
+  private Map<String, CompactionState> buildFingerprintToStateMapForFullSync()
+  {
+    final List<CompactionStateRecord> records = query(
+        SqlSegmentsMetadataQuery::retrieveAllUsedCompactionStates
+    );
+
+    return records.stream().collect(
+        Collectors.toMap(
+            CompactionStateRecord::getFingerprint,
+            CompactionStateRecord::getState
+        )
+    );
+  }
+
+  /**
+   * Retrieves compaction states from the metadata store if they are not 
present
+   * in the cache or have been recently updated in the metadata store. These
+   * compaction states along with those already present in the cache are used 
to
+   * build a complete updated map from compaction state fingerprint to state.
+   *
+   * @return Complete updated map from compaction state fingerprint to state 
for all
+   * used compaction states currently persisted in the metadata store.
+   */
+  private Map<String, CompactionState> buildFingerprintToStateMapForDeltaSync()
+  {
+    // Identify fingerprints in the cache and in the metadata store
+    final Map<String, CompactionState> fingerprintToStateMap = new HashMap<>(
+        compactionStateCache.getPublishedCompactionStateMap()
+    );
+    final Set<String> cachedFingerprints = 
Set.copyOf(fingerprintToStateMap.keySet());
+    final Set<String> persistedFingerprints = query(
+        SqlSegmentsMetadataQuery::retrieveAllUsedCompactionStateFingerprints
+    );
+
+    // Remove entry for compaction states that have been deleted from the 
metadata store
+    final Set<String> deletedFingerprints = 
Sets.difference(cachedFingerprints, persistedFingerprints);
+    deletedFingerprints.forEach(fingerprintToStateMap::remove);
+
+    // Retrieve and add entry for compaction states that have been added to 
the metadata store
+    final Set<String> addedFingerprints = 
Sets.difference(persistedFingerprints, cachedFingerprints);
+    final List<CompactionStateRecord> addedCompactionStateRecords = query(
+        sql -> sql.retrieveCompactionStatesForFingerprints(addedFingerprints)
+    );
+    if (addedCompactionStateRecords.size() < addedFingerprints.size()) {
+      emitMetric(Metric.SKIPPED_COMPACTION_STATES, addedFingerprints.size() - 
addedCompactionStateRecords.size());

Review Comment:
   sure, I kinda fell trap to just replicating what the schema cache metric 
was. You want to delete this one or just add the added and deletes on top of 
this?



##########
server/src/main/java/org/apache/druid/metadata/SQLMetadataConnector.java:
##########
@@ -1097,6 +1100,55 @@ public void createSegmentSchemasTable()
     }
   }
 
+  /**
+   * Creates the compaction states table for storing fingerprinted compaction 
states
+   * <p>
+   * This table stores unique compaction states that are referenced by
+   * segments via fingerprints.
+   */
+  public void createCompactionStatesTable(final String tableName)
+  {
+    createTable(
+        tableName,
+        ImmutableList.of(
+            StringUtils.format(
+                "CREATE TABLE %1$s (\n"
+                + "  id %2$s NOT NULL,\n"
+                + "  created_date VARCHAR(255) NOT NULL,\n"
+                + "  datasource VARCHAR(255) NOT NULL,\n"
+                + "  fingerprint VARCHAR(255) NOT NULL,\n"
+                + "  payload %3$s NOT NULL,\n"
+                + "  used BOOLEAN NOT NULL,\n"
+                + "  used_status_last_updated VARCHAR(255) NOT NULL,\n"
+                + "  PRIMARY KEY (id),\n"
+                + "  UNIQUE (fingerprint)\n"

Review Comment:
   uh I guess it probably doesn't matter? Maybe just a force of habit for 
people creating rdbms tables to throw in a serial id PK without thinking.



##########
server/src/main/java/org/apache/druid/server/coordinator/ClusterCompactionConfig.java:
##########
@@ -45,21 +45,34 @@ public class ClusterCompactionConfig
   private final boolean useSupervisors;
   private final CompactionEngine engine;
   private final CompactionCandidateSearchPolicy compactionPolicy;
+  /**
+   * Whether to persist last compaction state directly in segments for 
backwards compatibility.
+   * <p>
+   * In a future release this option will be removed and last compaction state 
will no longer be persisted in segments.
+   * Instead, it will only be stored in the metadata store with a fingerprint 
id that segments will reference. Some
+   * operators may want to disable this behavior early to begin saving space 
in segment metadatastore table entries.
+   */
+  private final boolean legacyPersistLastCompactionStateInSegments;
 
   @JsonCreator
   public ClusterCompactionConfig(
       @JsonProperty("compactionTaskSlotRatio") @Nullable Double 
compactionTaskSlotRatio,
       @JsonProperty("maxCompactionTaskSlots") @Nullable Integer 
maxCompactionTaskSlots,
       @JsonProperty("compactionPolicy") @Nullable 
CompactionCandidateSearchPolicy compactionPolicy,
       @JsonProperty("useSupervisors") @Nullable Boolean useSupervisors,
-      @JsonProperty("engine") @Nullable CompactionEngine engine
+      @JsonProperty("engine") @Nullable CompactionEngine engine,
+      @JsonProperty("legacyPersistLastCompactionStateInSegments") Boolean 
legacyPersistLastCompactionStateInSegments

Review Comment:
   good call



##########
server/src/main/java/org/apache/druid/server/compaction/CompactionStatus.java:
##########
@@ -351,40 +388,143 @@ private CompactionStatus evaluate()
         return inputBytesCheck;
       }
 
-      final List<String> reasonsForCompaction =
+      List<String> reasonsForCompaction = new ArrayList<>();
+      CompactionStatus compactedOnceCheck = 
segmentsHaveBeenCompactedAtLeastOnce();
+      if (!compactedOnceCheck.isComplete()) {
+        reasonsForCompaction.add(compactedOnceCheck.getReason());
+      }
+
+      if (compactionStateCache != null && targetFingerprint != null) {
+        // First try fingerprint-based evaluation (fast path)
+        CompactionStatus fingerprintStatus = FINGERPRINT_CHECKS.stream()
+                                                               .map(f -> 
f.apply(this))
+                                                               .filter(status 
-> !status.isComplete())
+                                                               
.findFirst().orElse(COMPLETE);
+
+        if (!fingerprintStatus.isComplete()) {
+          reasonsForCompaction.add(fingerprintStatus.getReason());
+        }
+      }
+
+      reasonsForCompaction.addAll(
           CHECKS.stream()
                 .map(f -> f.apply(this))
                 .filter(status -> !status.isComplete())
                 .map(CompactionStatus::getReason)
-                .collect(Collectors.toList());
+                .collect(Collectors.toList())
+      );
 
       // Consider segments which have passed all checks to be compacted
-      final List<DataSegment> compactedSegments = unknownStateToSegments
-          .values()
-          .stream()
-          .flatMap(List::stream)
-          .collect(Collectors.toList());
+      // Includes segments with correct fingerprints and segments that passed 
all state checks
+      final List<DataSegment> allCompactedSegments = new 
ArrayList<>(this.compactedSegments);
+      allCompactedSegments.addAll(
+          unknownStateToSegments
+              .values()
+              .stream()
+              .flatMap(List::stream)
+              .collect(Collectors.toList())
+      );
 
       if (reasonsForCompaction.isEmpty()) {
         return COMPLETE;
       } else {
         return CompactionStatus.pending(
-            createStats(compactedSegments),
+            createStats(allCompactedSegments),
             createStats(uncompactedSegments),
             reasonsForCompaction.get(0)
         );
       }
     }
 
+    /**
+     * Evaluates the fingerprints of all fingerprinted candidate segments 
against the expected fingerprint.
+     * <p>
+     * If all fingerprinted segments have the expected fingerprint, the check 
can quickly pass as COMPLETE. However,
+     * if any fingerprinted segment has a mismatched fingerprint, we need to 
investigate further by adding them to
+     * {@link #unknownStateToSegments} where their compaction states will be 
analyzed.
+     * </p>
+     */
+    private CompactionStatus 
allFingerprintedCandidatesHaveExpectedFingerprint()
+    {
+      Map<String, List<DataSegment>> mismatchedFingerprintToSegmentMap = new 
HashMap<>();
+      for (DataSegment segment : fingerprintedSegments) {
+        String fingerprint = segment.getCompactionStateFingerprint();
+        if (fingerprint != null && !fingerprint.equals(targetFingerprint)) {
+          mismatchedFingerprintToSegmentMap
+              .computeIfAbsent(fingerprint, k -> new ArrayList<>())
+              .add(segment);
+        } else if (fingerprint != null && 
fingerprint.equals(targetFingerprint)) {
+          // Segment has correct fingerprint - add to compacted segments
+          compactedSegments.add(segment);
+        }
+      }
+
+      if (mismatchedFingerprintToSegmentMap.isEmpty()) {
+        return COMPLETE;
+      }
+
+      boolean fingerprintedSegmentNeedingCompactionFound = false;
+
+      if (compactionStateCache != null) {

Review Comment:
   > Just to confirm, the compactionStateCache can be null only when running 
the legacy CompactSegments duty on the Coordinator, right? It is never expected 
to be null when running supervisors on the Overlord?
   
   correct. and I guess we should really never get here in that case either 
since there is a guard on it in evaluate() as it is now so this is more 
defensive



##########
multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java:
##########
@@ -1699,6 +1700,12 @@ private void handleQueryResults(
               Tasks.DEFAULT_STORE_COMPACTION_STATE
           );
 
+      final String compactionStateFingerprint = querySpec.getContext()
+          .getString(
+              Tasks.COMPACTION_STATE_FINGERPRINT_KEY,
+              null
+          );

Review Comment:
   ya, good point didn't realize there was one that uses a null defaul



##########
server/src/main/java/org/apache/druid/server/compaction/CompactionStatus.java:
##########
@@ -674,4 +814,59 @@ private static CompactionStatistics 
createStats(List<DataSegment> segments)
       return CompactionStatistics.create(totalBytes, segments.size(), 
segmentIntervals.size());
     }
   }
+
+  /**
+   * Given a {@link DataSourceCompactionConfig}, create a {@link 
CompactionState}
+   */
+  public static CompactionState 
createCompactionStateFromConfig(DataSourceCompactionConfig config)

Review Comment:
   good question. I have actually been moving it around a few times trying to 
decide best place. I do like your idea so I'll try that out and we can see how 
we feel about it



##########
server/src/main/java/org/apache/druid/server/compaction/CompactionStatus.java:
##########
@@ -351,40 +388,143 @@ private CompactionStatus evaluate()
         return inputBytesCheck;
       }
 
-      final List<String> reasonsForCompaction =
+      List<String> reasonsForCompaction = new ArrayList<>();
+      CompactionStatus compactedOnceCheck = 
segmentsHaveBeenCompactedAtLeastOnce();
+      if (!compactedOnceCheck.isComplete()) {
+        reasonsForCompaction.add(compactedOnceCheck.getReason());
+      }
+
+      if (compactionStateCache != null && targetFingerprint != null) {
+        // First try fingerprint-based evaluation (fast path)
+        CompactionStatus fingerprintStatus = FINGERPRINT_CHECKS.stream()
+                                                               .map(f -> 
f.apply(this))
+                                                               .filter(status 
-> !status.isComplete())
+                                                               
.findFirst().orElse(COMPLETE);
+
+        if (!fingerprintStatus.isComplete()) {
+          reasonsForCompaction.add(fingerprintStatus.getReason());
+        }
+      }
+
+      reasonsForCompaction.addAll(
           CHECKS.stream()
                 .map(f -> f.apply(this))
                 .filter(status -> !status.isComplete())
                 .map(CompactionStatus::getReason)
-                .collect(Collectors.toList());
+                .collect(Collectors.toList())
+      );
 
       // Consider segments which have passed all checks to be compacted
-      final List<DataSegment> compactedSegments = unknownStateToSegments
-          .values()
-          .stream()
-          .flatMap(List::stream)
-          .collect(Collectors.toList());
+      // Includes segments with correct fingerprints and segments that passed 
all state checks
+      final List<DataSegment> allCompactedSegments = new 
ArrayList<>(this.compactedSegments);
+      allCompactedSegments.addAll(
+          unknownStateToSegments
+              .values()
+              .stream()
+              .flatMap(List::stream)
+              .collect(Collectors.toList())
+      );
 
       if (reasonsForCompaction.isEmpty()) {
         return COMPLETE;
       } else {
         return CompactionStatus.pending(
-            createStats(compactedSegments),
+            createStats(allCompactedSegments),
             createStats(uncompactedSegments),
             reasonsForCompaction.get(0)
         );
       }
     }
 
+    /**
+     * Evaluates the fingerprints of all fingerprinted candidate segments 
against the expected fingerprint.
+     * <p>
+     * If all fingerprinted segments have the expected fingerprint, the check 
can quickly pass as COMPLETE. However,
+     * if any fingerprinted segment has a mismatched fingerprint, we need to 
investigate further by adding them to
+     * {@link #unknownStateToSegments} where their compaction states will be 
analyzed.
+     * </p>
+     */
+    private CompactionStatus 
allFingerprintedCandidatesHaveExpectedFingerprint()
+    {
+      Map<String, List<DataSegment>> mismatchedFingerprintToSegmentMap = new 
HashMap<>();
+      for (DataSegment segment : fingerprintedSegments) {
+        String fingerprint = segment.getCompactionStateFingerprint();
+        if (fingerprint != null && !fingerprint.equals(targetFingerprint)) {
+          mismatchedFingerprintToSegmentMap
+              .computeIfAbsent(fingerprint, k -> new ArrayList<>())
+              .add(segment);
+        } else if (fingerprint != null && 
fingerprint.equals(targetFingerprint)) {
+          // Segment has correct fingerprint - add to compacted segments
+          compactedSegments.add(segment);
+        }
+      }
+
+      if (mismatchedFingerprintToSegmentMap.isEmpty()) {
+        return COMPLETE;
+      }
+
+      boolean fingerprintedSegmentNeedingCompactionFound = false;
+
+      if (compactionStateCache != null) {
+        for (Map.Entry<String, List<DataSegment>> e : 
mismatchedFingerprintToSegmentMap.entrySet()) {
+          String fingerprint = e.getKey();
+          CompactionState stateToValidate = 
compactionStateCache.getCompactionStateByFingerprint(fingerprint).orElse(null);
+          if (stateToValidate == null) {
+            log.warn("No compaction state found for fingerprint[%s]", 
fingerprint);
+            fingerprintedSegmentNeedingCompactionFound = true;
+            uncompactedSegments.addAll(e.getValue());
+          } else {
+            // Note that this does not mean we need compaction yet - we need 
to validate the state further to determine this
+            unknownStateToSegments.compute(
+                stateToValidate,
+                (state, segments) -> {
+                  if (segments == null) {
+                    segments = new ArrayList<>();
+                  }
+                  segments.addAll(e.getValue());
+                  return segments;
+                }
+            );
+          }
+        }
+      } else {
+        for (Map.Entry<String, List<DataSegment>> e : 
mismatchedFingerprintToSegmentMap.entrySet()) {
+          uncompactedSegments.addAll(e.getValue());
+          fingerprintedSegmentNeedingCompactionFound = true;
+        }
+      }
+
+      if (fingerprintedSegmentNeedingCompactionFound) {
+        return CompactionStatus.pending("At least one segment has a mismatched 
fingerprint and needs compaction");
+      } else {
+        return COMPLETE;
+      }
+    }
+
+    /**
+     * Divvys up segments by certain characteristics and determines if any 
segments have never been compacted.
+     * <p>
+     * Segments are categorized into three groups:
+     * <ul>
+     *   <li>fingerprinted - segments who have a compaction state fingerprint 
and need more investigation before adding to {@link 
#unknownStateToSegments}</li>
+     *   <li>non-fingerprinted with a lastCompactionState - segments who have 
no fingerprint but have stored a lastCompactionState that needs to be 
analyzed</li>
+     *   <li>uncompacted - segments who have neither a fingerprint nor a 
lastCompactionState and thus definitely need compaction</li>
+     * </ul>
+     * </p>
+     */
     private CompactionStatus segmentsHaveBeenCompactedAtLeastOnce()
     {
-      // Identify the compaction states of all the segments
       for (DataSegment segment : candidateSegments.getSegments()) {
-        final CompactionState segmentState = segment.getLastCompactionState();
-        if (segmentState == null) {
-          uncompactedSegments.add(segment);
+        final String fingerprint = segment.getCompactionStateFingerprint();
+        if (fingerprint != null) {
+          fingerprintedSegments.add(segment);
         } else {
-          unknownStateToSegments.computeIfAbsent(segmentState, s -> new 
ArrayList<>()).add(segment);
+          final CompactionState segmentState = 
segment.getLastCompactionState();
+          if (segmentState == null) {
+            uncompactedSegments.add(segment);
+          } else {
+            unknownStateToSegments.computeIfAbsent(segmentState, k -> new 
ArrayList<>()).add(segment);
+          }

Review Comment:
   ty for calling out the not-ideal conditional choices in this file



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to