This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 99762cb [7338] Allow Reloading Segments with Multiple Threads (#7893)
99762cb is described below
commit 99762cbb89e6bc3c2609667771a04b4c70e0056d
Author: Prashant Pandey <[email protected]>
AuthorDate: Sat Dec 25 06:30:50 2021 +0530
[7338] Allow Reloading Segments with Multiple Threads (#7893)
---
.../core/data/manager/InstanceDataManager.java | 7 ++-
.../pinot/core/util/SegmentRefreshSemaphore.java | 60 ++++++++++++++++++++++
.../starter/helix/HelixInstanceDataManager.java | 40 ++++++++++-----
.../helix/SegmentMessageHandlerFactory.java | 52 ++++++-------------
4 files changed, 107 insertions(+), 52 deletions(-)
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/InstanceDataManager.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/InstanceDataManager.java
index 94546c0..14d98b3 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/InstanceDataManager.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/InstanceDataManager.java
@@ -29,6 +29,7 @@ import org.apache.helix.ZNRecord;
import org.apache.helix.store.zk.ZkHelixPropertyStore;
import org.apache.pinot.common.metrics.ServerMetrics;
import org.apache.pinot.core.data.manager.realtime.SegmentUploader;
+import org.apache.pinot.core.util.SegmentRefreshSemaphore;
import org.apache.pinot.segment.local.data.manager.TableDataManager;
import org.apache.pinot.segment.spi.SegmentMetadata;
import org.apache.pinot.spi.env.PinotConfiguration;
@@ -93,9 +94,11 @@ public interface InstanceDataManager {
throws Exception;
/**
- * Reloads all segments in a table.
+ * Reloads all segments of a table.
+ * @param segmentRefreshSemaphore semaphore to control concurrent segment
reloads/refresh
*/
- void reloadAllSegments(String tableNameWithType, boolean forceDownload)
+ void reloadAllSegments(String tableNameWithType, boolean forceDownload,
+ SegmentRefreshSemaphore segmentRefreshSemaphore)
throws Exception;
/**
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/util/SegmentRefreshSemaphore.java
b/pinot-core/src/main/java/org/apache/pinot/core/util/SegmentRefreshSemaphore.java
new file mode 100644
index 0000000..c2ed11c
--- /dev/null
+++
b/pinot-core/src/main/java/org/apache/pinot/core/util/SegmentRefreshSemaphore.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pinot.core.util;
+
+import java.util.concurrent.Semaphore;
+import org.slf4j.Logger;
+
+
+/**
+ * Wrapper class for semaphore used to control concurrent segment
reload/refresh
+ */
+public class SegmentRefreshSemaphore {
+
+ private final Semaphore _semaphore;
+
+ public SegmentRefreshSemaphore(int permits, boolean fair) {
+ if (permits > 0) {
+ _semaphore = new Semaphore(permits, fair);
+ } else {
+ _semaphore = null;
+ }
+ }
+
+ public void acquireSema(String segmentName, Logger logger)
+ throws InterruptedException {
+ if (_semaphore != null) {
+ long startTime = System.currentTimeMillis();
+ logger.info("Waiting for lock to refresh : {}, queue-length: {}",
segmentName,
+ _semaphore.getQueueLength());
+ _semaphore.acquire();
+ logger.info("Acquired lock to refresh segment: {} (lock-time={}ms,
queue-length={})", segmentName,
+ System.currentTimeMillis() - startTime, _semaphore.getQueueLength());
+ } else {
+ logger.info("Locking of refresh threads disabled (segment: {})",
segmentName);
+ }
+ }
+
+ public void releaseSema() {
+ if (_semaphore != null) {
+ _semaphore.release();
+ }
+ }
+}
diff --git
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
index 947ab4f..9d1af9c 100644
---
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
+++
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
@@ -27,7 +27,11 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Set;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import javax.annotation.Nullable;
import javax.annotation.concurrent.ThreadSafe;
@@ -44,6 +48,7 @@ import
org.apache.pinot.core.data.manager.offline.TableDataManagerProvider;
import org.apache.pinot.core.data.manager.realtime.PinotFSSegmentUploader;
import
org.apache.pinot.core.data.manager.realtime.SegmentBuildTimeLeaseExtender;
import org.apache.pinot.core.data.manager.realtime.SegmentUploader;
+import org.apache.pinot.core.util.SegmentRefreshSemaphore;
import org.apache.pinot.segment.local.data.manager.SegmentDataManager;
import org.apache.pinot.segment.local.data.manager.TableDataManager;
import org.apache.pinot.segment.local.data.manager.TableDataManagerConfig;
@@ -197,7 +202,8 @@ public class HelixInstanceDataManager implements
InstanceDataManager {
LOGGER.info("Reloading single segment: {} in table: {}", segmentName,
tableNameWithType);
SegmentMetadata segmentMetadata = getSegmentMetadata(tableNameWithType,
segmentName);
if (segmentMetadata == null) {
- LOGGER.info("Segment metadata is null. Skip reloading segment: {} in
table: {}", segmentName, tableNameWithType);
+ LOGGER.info("Segment metadata is null. Skip reloading segment: {} in
table: {}", segmentName,
+ tableNameWithType);
return;
}
@@ -212,33 +218,41 @@ public class HelixInstanceDataManager implements
InstanceDataManager {
}
@Override
- public void reloadAllSegments(String tableNameWithType, boolean
forceDownload) {
+ public void reloadAllSegments(String tableNameWithType, boolean
forceDownload,
+ SegmentRefreshSemaphore segmentRefreshSemaphore)
+ throws Exception {
LOGGER.info("Reloading all segments in table: {}", tableNameWithType);
TableConfig tableConfig =
ZKMetadataProvider.getTableConfig(_propertyStore, tableNameWithType);
Preconditions.checkNotNull(tableConfig);
-
Schema schema = ZKMetadataProvider.getTableSchema(_propertyStore,
tableNameWithType);
-
List<String> failedSegments = new ArrayList<>();
- Exception sampleException = null;
List<SegmentMetadata> segmentsMetadata =
getAllSegmentsMetadata(tableNameWithType);
- for (SegmentMetadata segmentMetadata : segmentsMetadata) {
+ ExecutorService workers = Executors.newCachedThreadPool();
+ final AtomicReference<Exception> sampleException = new AtomicReference<>();
+ //calling thread hasn't acquired any permit so we don't reload any
segments using it.
+ CompletableFuture.allOf(segmentsMetadata.stream().map(segmentMetadata ->
CompletableFuture.runAsync(() -> {
+ String segmentName = segmentMetadata.getName();
try {
- reloadSegment(tableNameWithType, segmentMetadata, tableConfig, schema,
forceDownload);
+ segmentRefreshSemaphore.acquireSema(segmentMetadata.getName(), LOGGER);
+ try {
+ reloadSegment(tableNameWithType, segmentMetadata, tableConfig,
schema, forceDownload);
+ } finally {
+ segmentRefreshSemaphore.releaseSema();
+ }
} catch (Exception e) {
- String segmentName = segmentMetadata.getName();
LOGGER.error("Caught exception while reloading segment: {} in table:
{}", segmentName, tableNameWithType, e);
failedSegments.add(segmentName);
- sampleException = e;
+ sampleException.set(e);
}
- }
+ }, workers)).toArray(CompletableFuture[]::new)).get();
- if (sampleException != null) {
+ workers.shutdownNow();
+
+ if (sampleException.get() != null) {
throw new RuntimeException(
String.format("Failed to reload %d/%d segments: %s in table: %s",
failedSegments.size(),
- segmentsMetadata.size(), failedSegments, tableNameWithType),
sampleException);
+ segmentsMetadata.size(), failedSegments, tableNameWithType),
sampleException.get());
}
-
LOGGER.info("Reloaded all segments in table: {}", tableNameWithType);
}
diff --git
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentMessageHandlerFactory.java
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentMessageHandlerFactory.java
index ad4a386..d00a558 100644
---
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentMessageHandlerFactory.java
+++
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentMessageHandlerFactory.java
@@ -18,7 +18,6 @@
*/
package org.apache.pinot.server.starter.helix;
-import java.util.concurrent.Semaphore;
import org.apache.helix.NotificationContext;
import org.apache.helix.messaging.handling.HelixTaskResult;
import org.apache.helix.messaging.handling.MessageHandler;
@@ -30,6 +29,7 @@ import org.apache.pinot.common.messages.SegmentReloadMessage;
import org.apache.pinot.common.metrics.ServerMeter;
import org.apache.pinot.common.metrics.ServerMetrics;
import org.apache.pinot.core.data.manager.InstanceDataManager;
+import org.apache.pinot.core.util.SegmentRefreshSemaphore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -39,39 +39,14 @@ public class SegmentMessageHandlerFactory implements
MessageHandlerFactory {
// We only allow limited number of segments refresh/reload happen at the
same time
// The reason for that is segment refresh/reload will temporarily use
double-sized memory
- private final Semaphore _refreshThreadSemaphore;
private final InstanceDataManager _instanceDataManager;
private final ServerMetrics _metrics;
+ private final SegmentRefreshSemaphore _segmentRefreshSemaphore;
public SegmentMessageHandlerFactory(InstanceDataManager instanceDataManager,
ServerMetrics metrics) {
_instanceDataManager = instanceDataManager;
_metrics = metrics;
- int maxParallelRefreshThreads =
instanceDataManager.getMaxParallelRefreshThreads();
- if (maxParallelRefreshThreads > 0) {
- _refreshThreadSemaphore = new Semaphore(maxParallelRefreshThreads, true);
- } else {
- _refreshThreadSemaphore = null;
- }
- }
-
- private void acquireSema(String context, Logger logger)
- throws InterruptedException {
- if (_refreshThreadSemaphore != null) {
- long startTime = System.currentTimeMillis();
- logger.info("Waiting for lock to refresh : {}, queue-length: {}",
context,
- _refreshThreadSemaphore.getQueueLength());
- _refreshThreadSemaphore.acquire();
- logger.info("Acquired lock to refresh segment: {} (lock-time={}ms,
queue-length={})", context,
- System.currentTimeMillis() - startTime,
_refreshThreadSemaphore.getQueueLength());
- } else {
- LOGGER.info("Locking of refresh threads disabled (segment: {})",
context);
- }
- }
-
- private void releaseSema() {
- if (_refreshThreadSemaphore != null) {
- _refreshThreadSemaphore.release();
- }
+ _segmentRefreshSemaphore = new
SegmentRefreshSemaphore(instanceDataManager.getMaxParallelRefreshThreads(),
true);
}
// Called each time a message is received.
@@ -113,7 +88,7 @@ public class SegmentMessageHandlerFactory implements
MessageHandlerFactory {
HelixTaskResult result = new HelixTaskResult();
_logger.info("Handling message: {}", _message);
try {
- acquireSema(_segmentName, _logger);
+ _segmentRefreshSemaphore.acquireSema(_segmentName, _logger);
// The number of retry times depends on the retry count in Constants.
_instanceDataManager.addOrReplaceSegment(_tableNameWithType,
_segmentName);
result.setSuccess(true);
@@ -121,7 +96,7 @@ public class SegmentMessageHandlerFactory implements
MessageHandlerFactory {
_metrics.addMeteredTableValue(_tableNameWithType,
ServerMeter.REFRESH_FAILURES, 1);
Utils.rethrowException(e);
} finally {
- releaseSema();
+ _segmentRefreshSemaphore.releaseSema();
}
return result;
}
@@ -143,14 +118,19 @@ public class SegmentMessageHandlerFactory implements
MessageHandlerFactory {
_logger.info("Handling message: {}", _message);
try {
if (_segmentName.equals("")) {
- acquireSema("ALL", _logger);
// NOTE: the method aborts if any segment reload encounters an
unhandled exception,
- // and can lead to inconsistent state across segments
- _instanceDataManager.reloadAllSegments(_tableNameWithType,
_forceDownload);
+ // and can lead to inconsistent state across segments.
+ //we don't acquire any permit here as they'll be acquired by worked
threads later
+ _instanceDataManager.reloadAllSegments(_tableNameWithType,
_forceDownload,
+ _segmentRefreshSemaphore);
} else {
// Reload one segment
- acquireSema(_segmentName, _logger);
- _instanceDataManager.reloadSegment(_tableNameWithType, _segmentName,
_forceDownload);
+ _segmentRefreshSemaphore.acquireSema(_segmentName, _logger);
+ try {
+ _instanceDataManager.reloadSegment(_tableNameWithType,
_segmentName, _forceDownload);
+ } finally {
+ _segmentRefreshSemaphore.releaseSema();
+ }
}
helixTaskResult.setSuccess(true);
} catch (Throwable e) {
@@ -159,8 +139,6 @@ public class SegmentMessageHandlerFactory implements
MessageHandlerFactory {
// (without any corresponding logs to indicate failure!) in the
callable path
throw new RuntimeException(
"Caught exception while reloading segment: " + _segmentName + " in
table: " + _tableNameWithType, e);
- } finally {
- releaseSema();
}
return helixTaskResult;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]