This is an automated email from the ASF dual-hosted git repository.
jihoonson pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git
The following commit(s) were added to refs/heads/master by this push:
new c5bf4e7 update insert pending segments logic to synchronous (#6336)
c5bf4e7 is described below
commit c5bf4e750328f5338c3b4ad3e4051ce46886ab73
Author: Faxian Zhao <[email protected]>
AuthorDate: Tue Oct 23 10:48:20 2018 +0800
update insert pending segments logic to synchronous (#6336)
* 1. Mysql default transaction isolation is REPEATABLE_READ, treat it as
READ_COMMITTED will reduce insert id conflict.
2. Add an index to 'dataSource used end' is work well for the most of
scenarios(get recently segments), and it will speed up sync add pending
segments in DB.
3. 'select and insert' is not need within transaction.
* Use TaskLockbox.doInCriticalSection instead of synchronized syntax to
speed up insert pending segments.
* fix typo for NullPointerException
---
.../common/actions/SegmentAllocateAction.java | 35 +++++++++++++++++-----
.../IndexerSQLMetadataStorageCoordinator.java | 10 +++----
.../druid/metadata/SQLMetadataConnector.java | 3 +-
3 files changed, 33 insertions(+), 15 deletions(-)
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentAllocateAction.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentAllocateAction.java
index 26c7bb7..98ad25d 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentAllocateAction.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentAllocateAction.java
@@ -23,9 +23,11 @@ import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.type.TypeReference;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
+import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import org.apache.druid.indexing.common.TaskLockType;
import org.apache.druid.indexing.common.task.Task;
+import org.apache.druid.indexing.overlord.CriticalAction;
import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
import org.apache.druid.indexing.overlord.LockResult;
import org.apache.druid.java.util.common.IAE;
@@ -267,14 +269,31 @@ public class SegmentAllocateAction implements
TaskAction<SegmentIdentifier>
}
if (lockResult.isOk()) {
- final SegmentIdentifier identifier =
toolbox.getIndexerMetadataStorageCoordinator().allocatePendingSegment(
- dataSource,
- sequenceName,
- previousSegmentId,
- tryInterval,
- lockResult.getTaskLock().getVersion(),
- skipSegmentLineageCheck
- );
+ final SegmentIdentifier identifier;
+ try {
+ identifier = toolbox.getTaskLockbox().doInCriticalSection(
+ task,
+ ImmutableList.of(tryInterval),
+ CriticalAction.<SegmentIdentifier>builder()
+ .onValidLocks(
+ () ->
toolbox.getIndexerMetadataStorageCoordinator().allocatePendingSegment(
+ dataSource,
+ sequenceName,
+ previousSegmentId,
+ tryInterval,
+ lockResult.getTaskLock().getVersion(),
+ skipSegmentLineageCheck
+ )
+ ).onInvalidLocks(
+ () -> null
+ )
+ .build()
+ );
+ }
+ catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+
if (identifier != null) {
return identifier;
} else {
diff --git
a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
index d021e4b..ccd2373 100644
---
a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
+++
b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
@@ -387,11 +387,11 @@ public class IndexerSQLMetadataStorageCoordinator
implements IndexerMetadataStor
Preconditions.checkNotNull(interval, "interval");
Preconditions.checkNotNull(maxVersion, "maxVersion");
- return connector.retryTransaction(
- new TransactionCallback<SegmentIdentifier>()
+ return connector.retryWithHandle(
+ new HandleCallback<SegmentIdentifier>()
{
@Override
- public SegmentIdentifier inTransaction(Handle handle,
TransactionStatus transactionStatus) throws Exception
+ public SegmentIdentifier withHandle(Handle handle) throws Exception
{
return skipSegmentLineageCheck ?
allocatePendingSegment(handle, dataSource, sequenceName,
interval, maxVersion) :
@@ -404,9 +404,7 @@ public class IndexerSQLMetadataStorageCoordinator
implements IndexerMetadataStor
maxVersion
);
}
- },
- ALLOCATE_SEGMENT_QUIET_TRIES,
- SQLMetadataConnector.DEFAULT_MAX_TRIES
+ }
);
}
diff --git
a/server/src/main/java/org/apache/druid/metadata/SQLMetadataConnector.java
b/server/src/main/java/org/apache/druid/metadata/SQLMetadataConnector.java
index 2d8e3e5..ddd33d1 100644
--- a/server/src/main/java/org/apache/druid/metadata/SQLMetadataConnector.java
+++ b/server/src/main/java/org/apache/druid/metadata/SQLMetadataConnector.java
@@ -33,6 +33,7 @@ import org.skife.jdbi.v2.Batch;
import org.skife.jdbi.v2.DBI;
import org.skife.jdbi.v2.Handle;
import org.skife.jdbi.v2.TransactionCallback;
+import org.skife.jdbi.v2.TransactionIsolationLevel;
import org.skife.jdbi.v2.TransactionStatus;
import org.skife.jdbi.v2.exceptions.DBIException;
import org.skife.jdbi.v2.exceptions.UnableToExecuteStatementException;
@@ -144,7 +145,7 @@ public abstract class SQLMetadataConnector implements
MetadataStorageConnector
public <T> T retryTransaction(final TransactionCallback<T> callback, final
int quietTries, final int maxTries)
{
try {
- return RetryUtils.retry(() -> getDBI().inTransaction(callback),
shouldRetry, quietTries, maxTries);
+ return RetryUtils.retry(() ->
getDBI().inTransaction(TransactionIsolationLevel.READ_COMMITTED, callback),
shouldRetry, quietTries, maxTries);
}
catch (Exception e) {
throw Throwables.propagate(e);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]