This is an automated email from the ASF dual-hosted git repository.

kfaraz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 9d77ef04f40 Cleanup usages of stopwatch (#16478)
9d77ef04f40 is described below

commit 9d77ef04f40d77b2fc1c17f4558845fcebe9492c
Author: Kashif Faraz <[email protected]>
AuthorDate: Mon May 27 23:08:46 2024 +0530

    Cleanup usages of stopwatch (#16478)
    
    Changes:
    - Remove synchronized methods from `Stopwatch`
    - Access stopwatch methods in `ChangeRequestHttpSyncer` inside a lock
---
 .../apache/druid/java/util/common/Stopwatch.java   |  17 ++--
 .../apache/druid/discovery/DruidLeaderClient.java  |   4 +-
 .../druid/metadata/SqlSegmentsMetadataManager.java | 102 ++++++++-------------
 .../metadata/SegmentSchemaBackFillQueue.java       |  29 +++---
 .../coordination/ChangeRequestHttpSyncer.java      |  40 ++++----
 5 files changed, 89 insertions(+), 103 deletions(-)

diff --git 
a/processing/src/main/java/org/apache/druid/java/util/common/Stopwatch.java 
b/processing/src/main/java/org/apache/druid/java/util/common/Stopwatch.java
index 2d941828a0e..ab48fd7f9b0 100644
--- a/processing/src/main/java/org/apache/druid/java/util/common/Stopwatch.java
+++ b/processing/src/main/java/org/apache/druid/java/util/common/Stopwatch.java
@@ -25,11 +25,8 @@ import org.joda.time.Duration;
 import java.util.concurrent.TimeUnit;
 
 /**
- * Thread-safe wrapper over {@link com.google.common.base.Stopwatch}.
- * <p>
- * Thread safety has been limited to the start/stop methods for now as they are
- * the only ones that can throw an exception in an illegal state and are thus
- * vulnerable to race conditions.
+ * Wrapper over {@link com.google.common.base.Stopwatch} to provide some 
utility
+ * methods such as {@link #millisElapsed()}, {@link #restart()}, {@link 
#hasElapsed(Duration)}.
  */
 public class Stopwatch
 {
@@ -55,17 +52,17 @@ public class Stopwatch
     this.delegate = delegate;
   }
 
-  public synchronized void start()
+  public void start()
   {
     delegate.start();
   }
 
-  public synchronized void stop()
+  public void stop()
   {
     delegate.stop();
   }
 
-  public synchronized void reset()
+  public void reset()
   {
     delegate.reset();
   }
@@ -73,12 +70,12 @@ public class Stopwatch
   /**
    * Invokes {@code reset().start()} on the underlying {@link 
com.google.common.base.Stopwatch}.
    */
-  public synchronized void restart()
+  public void restart()
   {
     delegate.reset().start();
   }
 
-  public synchronized boolean isRunning()
+  public boolean isRunning()
   {
     return delegate.isRunning();
   }
diff --git 
a/server/src/main/java/org/apache/druid/discovery/DruidLeaderClient.java 
b/server/src/main/java/org/apache/druid/discovery/DruidLeaderClient.java
index 4ca1441f6f2..28e48b29aa4 100644
--- a/server/src/main/java/org/apache/druid/discovery/DruidLeaderClient.java
+++ b/server/src/main/java/org/apache/druid/discovery/DruidLeaderClient.java
@@ -67,9 +67,9 @@ public class DruidLeaderClient
 
   private final String leaderRequestPath;
 
-  private LifecycleLock lifecycleLock = new LifecycleLock();
+  private final LifecycleLock lifecycleLock = new LifecycleLock();
   private DruidNodeDiscovery druidNodeDiscovery;
-  private AtomicReference<String> currentKnownLeader = new AtomicReference<>();
+  private final AtomicReference<String> currentKnownLeader = new 
AtomicReference<>();
 
   public DruidLeaderClient(
       HttpClient httpClient,
diff --git 
a/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataManager.java
 
b/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataManager.java
index 590a61d78d0..209deb41e7e 100644
--- 
a/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataManager.java
+++ 
b/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataManager.java
@@ -71,12 +71,9 @@ import org.skife.jdbi.v2.Query;
 import org.skife.jdbi.v2.StatementContext;
 import org.skife.jdbi.v2.TransactionCallback;
 import org.skife.jdbi.v2.TransactionStatus;
-import org.skife.jdbi.v2.tweak.ResultSetMapper;
 
 import javax.annotation.Nullable;
 import java.io.IOException;
-import java.sql.ResultSet;
-import java.sql.SQLException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -1036,42 +1033,26 @@ public class SqlSegmentsMetadataManager implements 
SegmentsMetadataManager
     final Stopwatch stopwatch = Stopwatch.createStarted();
     log.info("Starting polling of segment table.");
 
-    // some databases such as PostgreSQL require auto-commit turned off
+    // Some databases such as PostgreSQL require auto-commit turned off
     // to stream results back, enabling transactions disables auto-commit
-    //
     // setting connection to read-only will allow some database such as MySQL
     // to automatically use read-only transaction mode, further optimizing the 
query
     final List<DataSegment> segments = connector.inReadOnlyTransaction(
-        new TransactionCallback<List<DataSegment>>()
-        {
-          @Override
-          public List<DataSegment> inTransaction(Handle handle, 
TransactionStatus status)
-          {
-            return handle
-                .createQuery(StringUtils.format("SELECT payload FROM %s WHERE 
used=true", getSegmentsTable()))
-                .setFetchSize(connector.getStreamingFetchSize())
-                .map(
-                    new ResultSetMapper<DataSegment>()
-                    {
-                      @Override
-                      public DataSegment map(int index, ResultSet r, 
StatementContext ctx) throws SQLException
-                      {
-                        try {
-                          DataSegment segment = 
jsonMapper.readValue(r.getBytes("payload"), DataSegment.class);
-                          return replaceWithExistingSegmentIfPresent(segment);
-                        }
-                        catch (IOException e) {
-                          log.makeAlert(e, "Failed to read segment from 
db.").emit();
-                          // If one entry in database is corrupted doPoll() 
should continue to work overall. See
-                          // filter by `Objects::nonNull` below in this method.
-                          return null;
-                        }
-                      }
-                    }
-                )
-                .list();
-          }
-        }
+        (handle, status) -> handle
+            .createQuery(StringUtils.format("SELECT payload FROM %s WHERE 
used=true", getSegmentsTable()))
+            .setFetchSize(connector.getStreamingFetchSize())
+            .map((index, r, ctx) -> {
+              try {
+                DataSegment segment = 
jsonMapper.readValue(r.getBytes("payload"), DataSegment.class);
+                return replaceWithExistingSegmentIfPresent(segment);
+              }
+              catch (IOException e) {
+                log.makeAlert(e, "Failed to read segment from db.").emit();
+                // If one entry in database is corrupted doPoll() should 
continue to work overall. See
+                // filter by `Objects::nonNull` below in this method.
+                return null;
+              }
+            }).list()
     );
 
     Preconditions.checkNotNull(
@@ -1082,11 +1063,13 @@ public class SqlSegmentsMetadataManager implements 
SegmentsMetadataManager
     if (segments.isEmpty()) {
       log.info("No segments found in the database!");
     } else {
-      log.info("Polled and found [%,d] segments in the database in [%,d] ms.", 
segments.size(), stopwatch.millisElapsed());
+      log.info(
+          "Polled and found [%,d] segments in the database in [%,d] ms.",
+          segments.size(), stopwatch.millisElapsed()
+      );
     }
-    stopwatch.restart();
 
-    createDatasourcesSnapshot(stopwatch, segments);
+    createDatasourcesSnapshot(segments);
   }
 
   private void doPollSegmentAndSchema()
@@ -1157,25 +1140,18 @@ public class SqlSegmentsMetadataManager implements 
SegmentsMetadataManager
         (handle, status) -> {
           handle.createQuery(schemaPollQuery)
                 .setFetchSize(connector.getStreamingFetchSize())
-                .map(
-                    new ResultSetMapper<Void>()
-                    {
-                      @Override
-                      public Void map(int index, ResultSet r, StatementContext 
ctx) throws SQLException
-                      {
-                        try {
-                          schemaMapBuilder.put(
-                              r.getString("fingerprint"),
-                              jsonMapper.readValue(r.getBytes("payload"), 
SchemaPayload.class)
-                          );
-                        }
-                        catch (IOException e) {
-                          log.makeAlert(e, "Failed to read schema from 
db.").emit();
-                        }
-                        return null;
-                      }
-                    })
-                .list();
+                .map((index, r, ctx) -> {
+                  try {
+                    schemaMapBuilder.put(
+                        r.getString("fingerprint"),
+                        jsonMapper.readValue(r.getBytes("payload"), 
SchemaPayload.class)
+                    );
+                  }
+                  catch (IOException e) {
+                    log.makeAlert(e, "Failed to read schema from db.").emit();
+                  }
+                  return null;
+                }).list();
 
           
segmentSchemaCache.resetTemporaryPublishedMetadataQueryResultOnDBPoll();
           return null;
@@ -1195,19 +1171,17 @@ public class SqlSegmentsMetadataManager implements 
SegmentsMetadataManager
       log.info("No segments found in the database!");
     } else {
       log.info(
-          "Polled and found total [%,d] segments and [%,d] schema in the 
database in [%,d] ms.",
-          segments.size(),
-          schemaMap.size(),
-          stopwatch.millisElapsed()
+          "Polled and found [%,d] segments and [%,d] schemas in the database 
in [%,d] ms.",
+          segments.size(), schemaMap.size(), stopwatch.millisElapsed()
       );
     }
-    stopwatch.restart();
 
-    createDatasourcesSnapshot(stopwatch, segments);
+    createDatasourcesSnapshot(segments);
   }
 
-  private void createDatasourcesSnapshot(Stopwatch stopwatch, 
List<DataSegment> segments)
+  private void createDatasourcesSnapshot(List<DataSegment> segments)
   {
+    final Stopwatch stopwatch = Stopwatch.createStarted();
     // dataSourcesSnapshot is updated only here and the DataSourcesSnapshot 
object is immutable. If data sources or
     // segments are marked as used or unused directly (via markAs...() methods 
in SegmentsMetadataManager), the
     // dataSourcesSnapshot can become invalid until the next database poll.
diff --git 
a/server/src/main/java/org/apache/druid/segment/metadata/SegmentSchemaBackFillQueue.java
 
b/server/src/main/java/org/apache/druid/segment/metadata/SegmentSchemaBackFillQueue.java
index 66ce9ed4bde..7855e11da37 100644
--- 
a/server/src/main/java/org/apache/druid/segment/metadata/SegmentSchemaBackFillQueue.java
+++ 
b/server/src/main/java/org/apache/druid/segment/metadata/SegmentSchemaBackFillQueue.java
@@ -28,6 +28,7 @@ import 
org.apache.druid.java.util.common.lifecycle.LifecycleStop;
 import org.apache.druid.java.util.emitter.EmittingLogger;
 import org.apache.druid.java.util.emitter.service.ServiceEmitter;
 import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
+import org.apache.druid.query.DruidMetrics;
 import org.apache.druid.query.aggregation.AggregatorFactory;
 import org.apache.druid.segment.SchemaPayload;
 import org.apache.druid.segment.SchemaPayloadPlus;
@@ -159,13 +160,11 @@ public class SegmentSchemaBackFillQueue
       return;
     }
 
-    Stopwatch stopwatch = Stopwatch.createStarted();
+    final Stopwatch stopwatch = Stopwatch.createStarted();
+    log.info("Backfilling segment schema. Queue size is [%s].", queue.size());
 
-    log.info("Backfilling segment schema. Queue size is [%s]", queue.size());
-
-    int itemsToProcess = Math.min(MAX_BATCH_SIZE, queue.size());
-
-    Map<String, List<SegmentSchemaMetadataPlus>> polled = new HashMap<>();
+    final int itemsToProcess = Math.min(MAX_BATCH_SIZE, queue.size());
+    final Map<String, List<SegmentSchemaMetadataPlus>> polled = new 
HashMap<>();
     for (int i = 0; i < itemsToProcess; i++) {
       SegmentSchemaMetadataPlus item = queue.poll();
       if (item != null) {
@@ -175,21 +174,29 @@ public class SegmentSchemaBackFillQueue
 
     for (Map.Entry<String, List<SegmentSchemaMetadataPlus>> entry : 
polled.entrySet()) {
       try {
-        
segmentSchemaManager.persistSchemaAndUpdateSegmentsTable(entry.getKey(), 
entry.getValue(), CentralizedDatasourceSchemaConfig.SCHEMA_VERSION);
+        segmentSchemaManager.persistSchemaAndUpdateSegmentsTable(
+            entry.getKey(),
+            entry.getValue(),
+            CentralizedDatasourceSchemaConfig.SCHEMA_VERSION
+        );
+
         // Mark the segments as published in the cache.
         for (SegmentSchemaMetadataPlus plus : entry.getValue()) {
           
segmentSchemaCache.markMetadataQueryResultPublished(plus.getSegmentId());
         }
         emitter.emit(
             ServiceMetricEvent.builder()
-                                       .setDimension("dataSource", 
entry.getKey())
-                                       
.setMetric("metadatacache/backfill/count", entry.getValue().size())
+                              .setDimension(DruidMetrics.DATASOURCE, 
entry.getKey())
+                              .setMetric("metadatacache/backfill/count", 
entry.getValue().size())
         );
       }
       catch (Exception e) {
-        log.error(e, "Exception persisting schema and updating segments table 
for datasource [%s].", entry.getKey());
+        log.error(e, "Exception persisting schema and updating segments table 
for datasource[%s].", entry.getKey());
       }
     }
-    
emitter.emit(ServiceMetricEvent.builder().setMetric("metadatacache/backfill/time",
 stopwatch.millisElapsed()));
+    emitter.emit(
+        ServiceMetricEvent.builder()
+                          .setMetric("metadatacache/backfill/time", 
stopwatch.millisElapsed())
+    );
   }
 }
diff --git 
a/server/src/main/java/org/apache/druid/server/coordination/ChangeRequestHttpSyncer.java
 
b/server/src/main/java/org/apache/druid/server/coordination/ChangeRequestHttpSyncer.java
index 2d256b68806..ef14b901b6c 100644
--- 
a/server/src/main/java/org/apache/druid/server/coordination/ChangeRequestHttpSyncer.java
+++ 
b/server/src/main/java/org/apache/druid/server/coordination/ChangeRequestHttpSyncer.java
@@ -82,9 +82,13 @@ public class ChangeRequestHttpSyncer<T>
   private final CountDownLatch initializationLatch = new CountDownLatch(1);
 
   /**
-   * This lock is used to ensure proper start-then-stop semantics and making 
sure after stopping no state update happens
-   * and {@link #sync} is not again scheduled in {@link #executor} and if 
there was a previously scheduled sync before
-   * stopping, it is skipped and also, it is used to ensure that duplicate 
syncs are never scheduled in the executor.
+   * Lock to implement proper start-then-stop semantics. Used to ensure that:
+   * <ul>
+   * <li>No state update happens after {@link #stop()}.</li>
+   * <li>No sync is scheduled after {@link #stop()}.</li>
+   * <li>Any pending sync is skipped when {@link #stop()} has been called.</li>
+   * <li>Duplicate syncs are not scheduled on the executor.</li>
+   * </ul>
    */
   private final LifecycleLock startStopLock = new LifecycleLock();
 
@@ -141,7 +145,7 @@ public class ChangeRequestHttpSyncer<T>
         startStopLock.exitStart();
       }
 
-      sinceSyncerStart.restart();
+      safeRestart(sinceSyncerStart);
       addNextSyncToWorkQueue();
     }
   }
@@ -220,21 +224,18 @@ public class ChangeRequestHttpSyncer<T>
    */
   public boolean isSyncedSuccessfully()
   {
-    if (consecutiveFailedAttemptCount > 0) {
-      return false;
-    } else {
-      return sinceLastSyncSuccess.hasNotElapsed(maxDurationToWaitForSync);
-    }
+    return consecutiveFailedAttemptCount <= 0
+           && sinceLastSyncSuccess.hasNotElapsed(maxDurationToWaitForSync);
   }
 
-  private void sync()
+  private void sendSyncRequest()
   {
     if (!startStopLock.awaitStarted(1, TimeUnit.MILLISECONDS)) {
       log.info("Skipping sync for server[%s] as syncer has not started yet.", 
logIdentity);
       return;
     }
 
-    sinceLastSyncRequest.restart();
+    safeRestart(sinceLastSyncRequest);
 
     try {
       final String req = getRequestString();
@@ -270,7 +271,7 @@ public class ChangeRequestHttpSyncer<T>
                   final int responseCode = responseHandler.getStatus();
                   if (responseCode == HttpServletResponse.SC_NO_CONTENT) {
                     log.debug("Received NO CONTENT from server[%s]", 
logIdentity);
-                    sinceLastSyncSuccess.restart();
+                    safeRestart(sinceLastSyncSuccess);
                     return;
                   } else if (responseCode != HttpServletResponse.SC_OK) {
                     handleFailure(new ISE("Received sync response [%d]", 
responseCode));
@@ -306,7 +307,7 @@ public class ChangeRequestHttpSyncer<T>
                     log.info("Server[%s] synced successfully.", logIdentity);
                   }
 
-                  sinceLastSyncSuccess.restart();
+                  safeRestart(sinceLastSyncSuccess);
                 }
                 catch (Exception ex) {
                   markServerUnstableAndAlert(ex, "Processing Response");
@@ -390,9 +391,9 @@ public class ChangeRequestHttpSyncer<T>
               RetryUtils.nextRetrySleepMillis(consecutiveFailedAttemptCount)
           );
           log.info("Scheduling next sync for server[%s] in [%d] millis.", 
logIdentity, delayMillis);
-          executor.schedule(this::sync, delayMillis, TimeUnit.MILLISECONDS);
+          executor.schedule(this::sendSyncRequest, delayMillis, 
TimeUnit.MILLISECONDS);
         } else {
-          executor.execute(this::sync);
+          executor.execute(this::sendSyncRequest);
         }
       }
       catch (Throwable th) {
@@ -410,10 +411,17 @@ public class ChangeRequestHttpSyncer<T>
     }
   }
 
+  private void safeRestart(Stopwatch stopwatch)
+  {
+    synchronized (startStopLock) {
+      stopwatch.restart();
+    }
+  }
+
   private void markServerUnstableAndAlert(Throwable throwable, String action)
   {
     if (consecutiveFailedAttemptCount++ == 0) {
-      sinceUnstable.restart();
+      safeRestart(sinceUnstable);
     }
 
     final long unstableSeconds = getUnstableTimeMillis() / 1000;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to