This is an automated email from the ASF dual-hosted git repository.
kfaraz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 9d77ef04f40 Cleanup usages of stopwatch (#16478)
9d77ef04f40 is described below
commit 9d77ef04f40d77b2fc1c17f4558845fcebe9492c
Author: Kashif Faraz <[email protected]>
AuthorDate: Mon May 27 23:08:46 2024 +0530
Cleanup usages of stopwatch (#16478)
Changes:
- Remove synchronized methods from `Stopwatch`
- Access stopwatch methods in `ChangeRequestHttpSyncer` inside a lock
---
.../apache/druid/java/util/common/Stopwatch.java | 17 ++--
.../apache/druid/discovery/DruidLeaderClient.java | 4 +-
.../druid/metadata/SqlSegmentsMetadataManager.java | 102 ++++++++-------------
.../metadata/SegmentSchemaBackFillQueue.java | 29 +++---
.../coordination/ChangeRequestHttpSyncer.java | 40 ++++----
5 files changed, 89 insertions(+), 103 deletions(-)
diff --git
a/processing/src/main/java/org/apache/druid/java/util/common/Stopwatch.java
b/processing/src/main/java/org/apache/druid/java/util/common/Stopwatch.java
index 2d941828a0e..ab48fd7f9b0 100644
--- a/processing/src/main/java/org/apache/druid/java/util/common/Stopwatch.java
+++ b/processing/src/main/java/org/apache/druid/java/util/common/Stopwatch.java
@@ -25,11 +25,8 @@ import org.joda.time.Duration;
import java.util.concurrent.TimeUnit;
/**
- * Thread-safe wrapper over {@link com.google.common.base.Stopwatch}.
- * <p>
- * Thread safety has been limited to the start/stop methods for now as they are
- * the only ones that can throw an exception in an illegal state and are thus
- * vulnerable to race conditions.
+ * Wrapper over {@link com.google.common.base.Stopwatch} to provide some
utility
+ * methods such as {@link #millisElapsed()}, {@link #restart()}, {@link
#hasElapsed(Duration)}.
*/
public class Stopwatch
{
@@ -55,17 +52,17 @@ public class Stopwatch
this.delegate = delegate;
}
- public synchronized void start()
+ public void start()
{
delegate.start();
}
- public synchronized void stop()
+ public void stop()
{
delegate.stop();
}
- public synchronized void reset()
+ public void reset()
{
delegate.reset();
}
@@ -73,12 +70,12 @@ public class Stopwatch
/**
* Invokes {@code reset().start()} on the underlying {@link
com.google.common.base.Stopwatch}.
*/
- public synchronized void restart()
+ public void restart()
{
delegate.reset().start();
}
- public synchronized boolean isRunning()
+ public boolean isRunning()
{
return delegate.isRunning();
}
diff --git
a/server/src/main/java/org/apache/druid/discovery/DruidLeaderClient.java
b/server/src/main/java/org/apache/druid/discovery/DruidLeaderClient.java
index 4ca1441f6f2..28e48b29aa4 100644
--- a/server/src/main/java/org/apache/druid/discovery/DruidLeaderClient.java
+++ b/server/src/main/java/org/apache/druid/discovery/DruidLeaderClient.java
@@ -67,9 +67,9 @@ public class DruidLeaderClient
private final String leaderRequestPath;
- private LifecycleLock lifecycleLock = new LifecycleLock();
+ private final LifecycleLock lifecycleLock = new LifecycleLock();
private DruidNodeDiscovery druidNodeDiscovery;
- private AtomicReference<String> currentKnownLeader = new AtomicReference<>();
+ private final AtomicReference<String> currentKnownLeader = new
AtomicReference<>();
public DruidLeaderClient(
HttpClient httpClient,
diff --git
a/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataManager.java
b/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataManager.java
index 590a61d78d0..209deb41e7e 100644
---
a/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataManager.java
+++
b/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataManager.java
@@ -71,12 +71,9 @@ import org.skife.jdbi.v2.Query;
import org.skife.jdbi.v2.StatementContext;
import org.skife.jdbi.v2.TransactionCallback;
import org.skife.jdbi.v2.TransactionStatus;
-import org.skife.jdbi.v2.tweak.ResultSetMapper;
import javax.annotation.Nullable;
import java.io.IOException;
-import java.sql.ResultSet;
-import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
@@ -1036,42 +1033,26 @@ public class SqlSegmentsMetadataManager implements
SegmentsMetadataManager
final Stopwatch stopwatch = Stopwatch.createStarted();
log.info("Starting polling of segment table.");
- // some databases such as PostgreSQL require auto-commit turned off
+ // Some databases such as PostgreSQL require auto-commit turned off
// to stream results back, enabling transactions disables auto-commit
- //
// setting connection to read-only will allow some database such as MySQL
// to automatically use read-only transaction mode, further optimizing the
query
final List<DataSegment> segments = connector.inReadOnlyTransaction(
- new TransactionCallback<List<DataSegment>>()
- {
- @Override
- public List<DataSegment> inTransaction(Handle handle,
TransactionStatus status)
- {
- return handle
- .createQuery(StringUtils.format("SELECT payload FROM %s WHERE
used=true", getSegmentsTable()))
- .setFetchSize(connector.getStreamingFetchSize())
- .map(
- new ResultSetMapper<DataSegment>()
- {
- @Override
- public DataSegment map(int index, ResultSet r,
StatementContext ctx) throws SQLException
- {
- try {
- DataSegment segment =
jsonMapper.readValue(r.getBytes("payload"), DataSegment.class);
- return replaceWithExistingSegmentIfPresent(segment);
- }
- catch (IOException e) {
- log.makeAlert(e, "Failed to read segment from
db.").emit();
- // If one entry in database is corrupted doPoll()
should continue to work overall. See
- // filter by `Objects::nonNull` below in this method.
- return null;
- }
- }
- }
- )
- .list();
- }
- }
+ (handle, status) -> handle
+ .createQuery(StringUtils.format("SELECT payload FROM %s WHERE
used=true", getSegmentsTable()))
+ .setFetchSize(connector.getStreamingFetchSize())
+ .map((index, r, ctx) -> {
+ try {
+ DataSegment segment =
jsonMapper.readValue(r.getBytes("payload"), DataSegment.class);
+ return replaceWithExistingSegmentIfPresent(segment);
+ }
+ catch (IOException e) {
+ log.makeAlert(e, "Failed to read segment from db.").emit();
+ // If one entry in database is corrupted doPoll() should
continue to work overall. See
+ // filter by `Objects::nonNull` below in this method.
+ return null;
+ }
+ }).list()
);
Preconditions.checkNotNull(
@@ -1082,11 +1063,13 @@ public class SqlSegmentsMetadataManager implements
SegmentsMetadataManager
if (segments.isEmpty()) {
log.info("No segments found in the database!");
} else {
- log.info("Polled and found [%,d] segments in the database in [%,d] ms.",
segments.size(), stopwatch.millisElapsed());
+ log.info(
+ "Polled and found [%,d] segments in the database in [%,d] ms.",
+ segments.size(), stopwatch.millisElapsed()
+ );
}
- stopwatch.restart();
- createDatasourcesSnapshot(stopwatch, segments);
+ createDatasourcesSnapshot(segments);
}
private void doPollSegmentAndSchema()
@@ -1157,25 +1140,18 @@ public class SqlSegmentsMetadataManager implements
SegmentsMetadataManager
(handle, status) -> {
handle.createQuery(schemaPollQuery)
.setFetchSize(connector.getStreamingFetchSize())
- .map(
- new ResultSetMapper<Void>()
- {
- @Override
- public Void map(int index, ResultSet r, StatementContext
ctx) throws SQLException
- {
- try {
- schemaMapBuilder.put(
- r.getString("fingerprint"),
- jsonMapper.readValue(r.getBytes("payload"),
SchemaPayload.class)
- );
- }
- catch (IOException e) {
- log.makeAlert(e, "Failed to read schema from
db.").emit();
- }
- return null;
- }
- })
- .list();
+ .map((index, r, ctx) -> {
+ try {
+ schemaMapBuilder.put(
+ r.getString("fingerprint"),
+ jsonMapper.readValue(r.getBytes("payload"),
SchemaPayload.class)
+ );
+ }
+ catch (IOException e) {
+ log.makeAlert(e, "Failed to read schema from db.").emit();
+ }
+ return null;
+ }).list();
segmentSchemaCache.resetTemporaryPublishedMetadataQueryResultOnDBPoll();
return null;
@@ -1195,19 +1171,17 @@ public class SqlSegmentsMetadataManager implements
SegmentsMetadataManager
log.info("No segments found in the database!");
} else {
log.info(
- "Polled and found total [%,d] segments and [%,d] schema in the
database in [%,d] ms.",
- segments.size(),
- schemaMap.size(),
- stopwatch.millisElapsed()
+ "Polled and found [%,d] segments and [%,d] schemas in the database
in [%,d] ms.",
+ segments.size(), schemaMap.size(), stopwatch.millisElapsed()
);
}
- stopwatch.restart();
- createDatasourcesSnapshot(stopwatch, segments);
+ createDatasourcesSnapshot(segments);
}
- private void createDatasourcesSnapshot(Stopwatch stopwatch,
List<DataSegment> segments)
+ private void createDatasourcesSnapshot(List<DataSegment> segments)
{
+ final Stopwatch stopwatch = Stopwatch.createStarted();
// dataSourcesSnapshot is updated only here and the DataSourcesSnapshot
object is immutable. If data sources or
// segments are marked as used or unused directly (via markAs...() methods
in SegmentsMetadataManager), the
// dataSourcesSnapshot can become invalid until the next database poll.
diff --git
a/server/src/main/java/org/apache/druid/segment/metadata/SegmentSchemaBackFillQueue.java
b/server/src/main/java/org/apache/druid/segment/metadata/SegmentSchemaBackFillQueue.java
index 66ce9ed4bde..7855e11da37 100644
---
a/server/src/main/java/org/apache/druid/segment/metadata/SegmentSchemaBackFillQueue.java
+++
b/server/src/main/java/org/apache/druid/segment/metadata/SegmentSchemaBackFillQueue.java
@@ -28,6 +28,7 @@ import
org.apache.druid.java.util.common.lifecycle.LifecycleStop;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
+import org.apache.druid.query.DruidMetrics;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.segment.SchemaPayload;
import org.apache.druid.segment.SchemaPayloadPlus;
@@ -159,13 +160,11 @@ public class SegmentSchemaBackFillQueue
return;
}
- Stopwatch stopwatch = Stopwatch.createStarted();
+ final Stopwatch stopwatch = Stopwatch.createStarted();
+ log.info("Backfilling segment schema. Queue size is [%s].", queue.size());
- log.info("Backfilling segment schema. Queue size is [%s]", queue.size());
-
- int itemsToProcess = Math.min(MAX_BATCH_SIZE, queue.size());
-
- Map<String, List<SegmentSchemaMetadataPlus>> polled = new HashMap<>();
+ final int itemsToProcess = Math.min(MAX_BATCH_SIZE, queue.size());
+ final Map<String, List<SegmentSchemaMetadataPlus>> polled = new
HashMap<>();
for (int i = 0; i < itemsToProcess; i++) {
SegmentSchemaMetadataPlus item = queue.poll();
if (item != null) {
@@ -175,21 +174,29 @@ public class SegmentSchemaBackFillQueue
for (Map.Entry<String, List<SegmentSchemaMetadataPlus>> entry :
polled.entrySet()) {
try {
-
segmentSchemaManager.persistSchemaAndUpdateSegmentsTable(entry.getKey(),
entry.getValue(), CentralizedDatasourceSchemaConfig.SCHEMA_VERSION);
+ segmentSchemaManager.persistSchemaAndUpdateSegmentsTable(
+ entry.getKey(),
+ entry.getValue(),
+ CentralizedDatasourceSchemaConfig.SCHEMA_VERSION
+ );
+
// Mark the segments as published in the cache.
for (SegmentSchemaMetadataPlus plus : entry.getValue()) {
segmentSchemaCache.markMetadataQueryResultPublished(plus.getSegmentId());
}
emitter.emit(
ServiceMetricEvent.builder()
- .setDimension("dataSource",
entry.getKey())
-
.setMetric("metadatacache/backfill/count", entry.getValue().size())
+ .setDimension(DruidMetrics.DATASOURCE,
entry.getKey())
+ .setMetric("metadatacache/backfill/count",
entry.getValue().size())
);
}
catch (Exception e) {
- log.error(e, "Exception persisting schema and updating segments table
for datasource [%s].", entry.getKey());
+ log.error(e, "Exception persisting schema and updating segments table
for datasource[%s].", entry.getKey());
}
}
-
emitter.emit(ServiceMetricEvent.builder().setMetric("metadatacache/backfill/time",
stopwatch.millisElapsed()));
+ emitter.emit(
+ ServiceMetricEvent.builder()
+ .setMetric("metadatacache/backfill/time",
stopwatch.millisElapsed())
+ );
}
}
diff --git
a/server/src/main/java/org/apache/druid/server/coordination/ChangeRequestHttpSyncer.java
b/server/src/main/java/org/apache/druid/server/coordination/ChangeRequestHttpSyncer.java
index 2d256b68806..ef14b901b6c 100644
---
a/server/src/main/java/org/apache/druid/server/coordination/ChangeRequestHttpSyncer.java
+++
b/server/src/main/java/org/apache/druid/server/coordination/ChangeRequestHttpSyncer.java
@@ -82,9 +82,13 @@ public class ChangeRequestHttpSyncer<T>
private final CountDownLatch initializationLatch = new CountDownLatch(1);
/**
- * This lock is used to ensure proper start-then-stop semantics and making
sure after stopping no state update happens
- * and {@link #sync} is not again scheduled in {@link #executor} and if
there was a previously scheduled sync before
- * stopping, it is skipped and also, it is used to ensure that duplicate
syncs are never scheduled in the executor.
+ * Lock to implement proper start-then-stop semantics. Used to ensure that:
+ * <ul>
+ * <li>No state update happens after {@link #stop()}.</li>
+ * <li>No sync is scheduled after {@link #stop()}.</li>
+ * <li>Any pending sync is skipped when {@link #stop()} has been called.</li>
+ * <li>Duplicate syncs are not scheduled on the executor.</li>
+ * </ul>
*/
private final LifecycleLock startStopLock = new LifecycleLock();
@@ -141,7 +145,7 @@ public class ChangeRequestHttpSyncer<T>
startStopLock.exitStart();
}
- sinceSyncerStart.restart();
+ safeRestart(sinceSyncerStart);
addNextSyncToWorkQueue();
}
}
@@ -220,21 +224,18 @@ public class ChangeRequestHttpSyncer<T>
*/
public boolean isSyncedSuccessfully()
{
- if (consecutiveFailedAttemptCount > 0) {
- return false;
- } else {
- return sinceLastSyncSuccess.hasNotElapsed(maxDurationToWaitForSync);
- }
+ return consecutiveFailedAttemptCount <= 0
+ && sinceLastSyncSuccess.hasNotElapsed(maxDurationToWaitForSync);
}
- private void sync()
+ private void sendSyncRequest()
{
if (!startStopLock.awaitStarted(1, TimeUnit.MILLISECONDS)) {
log.info("Skipping sync for server[%s] as syncer has not started yet.",
logIdentity);
return;
}
- sinceLastSyncRequest.restart();
+ safeRestart(sinceLastSyncRequest);
try {
final String req = getRequestString();
@@ -270,7 +271,7 @@ public class ChangeRequestHttpSyncer<T>
final int responseCode = responseHandler.getStatus();
if (responseCode == HttpServletResponse.SC_NO_CONTENT) {
log.debug("Received NO CONTENT from server[%s]",
logIdentity);
- sinceLastSyncSuccess.restart();
+ safeRestart(sinceLastSyncSuccess);
return;
} else if (responseCode != HttpServletResponse.SC_OK) {
handleFailure(new ISE("Received sync response [%d]",
responseCode));
@@ -306,7 +307,7 @@ public class ChangeRequestHttpSyncer<T>
log.info("Server[%s] synced successfully.", logIdentity);
}
- sinceLastSyncSuccess.restart();
+ safeRestart(sinceLastSyncSuccess);
}
catch (Exception ex) {
markServerUnstableAndAlert(ex, "Processing Response");
@@ -390,9 +391,9 @@ public class ChangeRequestHttpSyncer<T>
RetryUtils.nextRetrySleepMillis(consecutiveFailedAttemptCount)
);
log.info("Scheduling next sync for server[%s] in [%d] millis.",
logIdentity, delayMillis);
- executor.schedule(this::sync, delayMillis, TimeUnit.MILLISECONDS);
+ executor.schedule(this::sendSyncRequest, delayMillis,
TimeUnit.MILLISECONDS);
} else {
- executor.execute(this::sync);
+ executor.execute(this::sendSyncRequest);
}
}
catch (Throwable th) {
@@ -410,10 +411,17 @@ public class ChangeRequestHttpSyncer<T>
}
}
+ private void safeRestart(Stopwatch stopwatch)
+ {
+ synchronized (startStopLock) {
+ stopwatch.restart();
+ }
+ }
+
private void markServerUnstableAndAlert(Throwable throwable, String action)
{
if (consecutiveFailedAttemptCount++ == 0) {
- sinceUnstable.restart();
+ safeRestart(sinceUnstable);
}
final long unstableSeconds = getUnstableTimeMillis() / 1000;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]