This is an automated email from the ASF dual-hosted git repository.

jihoonson pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git


The following commit(s) were added to refs/heads/master by this push:
     new c5bf4e7  update insert pending segments logic to synchronous (#6336)
c5bf4e7 is described below

commit c5bf4e750328f5338c3b4ad3e4051ce46886ab73
Author: Faxian Zhao <[email protected]>
AuthorDate: Tue Oct 23 10:48:20 2018 +0800

    update insert pending segments logic to synchronous (#6336)
    
    * 1. Mysql default transaction isolation is REPEATABLE_READ, treat it as 
READ_COMMITTED will reduce insert id conflict.
    2. Add an index to 'dataSource used end' is work well for the most of 
scenarios(get recently segments), and it will speed up sync add pending 
segments in DB.
    3. 'select and insert' is not need within transaction.
    
    * Use TaskLockbox.doInCriticalSection instead of synchronized syntax to 
speed up insert pending segments.
    
    * fix typo for NullPointerException
---
 .../common/actions/SegmentAllocateAction.java      | 35 +++++++++++++++++-----
 .../IndexerSQLMetadataStorageCoordinator.java      | 10 +++----
 .../druid/metadata/SQLMetadataConnector.java       |  3 +-
 3 files changed, 33 insertions(+), 15 deletions(-)

diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentAllocateAction.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentAllocateAction.java
index 26c7bb7..98ad25d 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentAllocateAction.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentAllocateAction.java
@@ -23,9 +23,11 @@ import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.core.type.TypeReference;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Throwables;
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableSet;
 import org.apache.druid.indexing.common.TaskLockType;
 import org.apache.druid.indexing.common.task.Task;
+import org.apache.druid.indexing.overlord.CriticalAction;
 import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
 import org.apache.druid.indexing.overlord.LockResult;
 import org.apache.druid.java.util.common.IAE;
@@ -267,14 +269,31 @@ public class SegmentAllocateAction implements 
TaskAction<SegmentIdentifier>
     }
 
     if (lockResult.isOk()) {
-      final SegmentIdentifier identifier = 
toolbox.getIndexerMetadataStorageCoordinator().allocatePendingSegment(
-          dataSource,
-          sequenceName,
-          previousSegmentId,
-          tryInterval,
-          lockResult.getTaskLock().getVersion(),
-          skipSegmentLineageCheck
-      );
+      final SegmentIdentifier identifier;
+      try {
+        identifier = toolbox.getTaskLockbox().doInCriticalSection(
+            task,
+            ImmutableList.of(tryInterval),
+            CriticalAction.<SegmentIdentifier>builder()
+                .onValidLocks(
+                    () -> 
toolbox.getIndexerMetadataStorageCoordinator().allocatePendingSegment(
+                        dataSource,
+                        sequenceName,
+                        previousSegmentId,
+                        tryInterval,
+                        lockResult.getTaskLock().getVersion(),
+                        skipSegmentLineageCheck
+                    )
+                ).onInvalidLocks(
+                    () -> null
+                    )
+                .build()
+        );
+      }
+      catch (Exception e) {
+        throw new RuntimeException(e);
+      }
+
       if (identifier != null) {
         return identifier;
       } else {
diff --git 
a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
 
b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
index d021e4b..ccd2373 100644
--- 
a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
+++ 
b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
@@ -387,11 +387,11 @@ public class IndexerSQLMetadataStorageCoordinator 
implements IndexerMetadataStor
     Preconditions.checkNotNull(interval, "interval");
     Preconditions.checkNotNull(maxVersion, "maxVersion");
 
-    return connector.retryTransaction(
-        new TransactionCallback<SegmentIdentifier>()
+    return connector.retryWithHandle(
+        new HandleCallback<SegmentIdentifier>()
         {
           @Override
-          public SegmentIdentifier inTransaction(Handle handle, 
TransactionStatus transactionStatus) throws Exception
+          public SegmentIdentifier withHandle(Handle handle) throws Exception
           {
             return skipSegmentLineageCheck ?
                    allocatePendingSegment(handle, dataSource, sequenceName, 
interval, maxVersion) :
@@ -404,9 +404,7 @@ public class IndexerSQLMetadataStorageCoordinator 
implements IndexerMetadataStor
                        maxVersion
                    );
           }
-        },
-        ALLOCATE_SEGMENT_QUIET_TRIES,
-        SQLMetadataConnector.DEFAULT_MAX_TRIES
+        }
     );
   }
 
diff --git 
a/server/src/main/java/org/apache/druid/metadata/SQLMetadataConnector.java 
b/server/src/main/java/org/apache/druid/metadata/SQLMetadataConnector.java
index 2d8e3e5..ddd33d1 100644
--- a/server/src/main/java/org/apache/druid/metadata/SQLMetadataConnector.java
+++ b/server/src/main/java/org/apache/druid/metadata/SQLMetadataConnector.java
@@ -33,6 +33,7 @@ import org.skife.jdbi.v2.Batch;
 import org.skife.jdbi.v2.DBI;
 import org.skife.jdbi.v2.Handle;
 import org.skife.jdbi.v2.TransactionCallback;
+import org.skife.jdbi.v2.TransactionIsolationLevel;
 import org.skife.jdbi.v2.TransactionStatus;
 import org.skife.jdbi.v2.exceptions.DBIException;
 import org.skife.jdbi.v2.exceptions.UnableToExecuteStatementException;
@@ -144,7 +145,7 @@ public abstract class SQLMetadataConnector implements 
MetadataStorageConnector
   public <T> T retryTransaction(final TransactionCallback<T> callback, final 
int quietTries, final int maxTries)
   {
     try {
-      return RetryUtils.retry(() -> getDBI().inTransaction(callback), 
shouldRetry, quietTries, maxTries);
+      return RetryUtils.retry(() -> 
getDBI().inTransaction(TransactionIsolationLevel.READ_COMMITTED, callback), 
shouldRetry, quietTries, maxTries);
     }
     catch (Exception e) {
       throw Throwables.propagate(e);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to