This is an automated email from the ASF dual-hosted git repository.

jonwei pushed a commit to branch 0.12.3
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git


The following commit(s) were added to refs/heads/0.12.3 by this push:
     new 8b1c8e4  [Backport] Fix three bugs with segment publishing. (#6155) 
(#6187)
8b1c8e4 is described below

commit 8b1c8e4c1552646b5723e5e443edff6043d65cbe
Author: Jonathan Wei <jon-...@users.noreply.github.com>
AuthorDate: Fri Aug 17 18:58:27 2018 -0700

    [Backport] Fix three bugs with segment publishing. (#6155) (#6187)
    
    * [Backport] Fix three bugs with segment publishing. (#6155)
    
    * Fix KafkaIndexTask
---
 .../io/druid/indexing/kafka/KafkaIndexTask.java    |  4 +-
 .../io/druid/indexing/common/task/IndexTask.java   |  2 +-
 .../IndexerMetadataStorageCoordinator.java         |  5 +-
 .../IndexerSQLMetadataStorageCoordinator.java      | 20 ++++++--
 .../realtime/appenderator/AppenderatorImpl.java    | 11 ++++-
 .../appenderator/BaseAppenderatorDriver.java       | 25 ++++------
 .../TransactionalSegmentPublisher.java             |  8 +++-
 .../appenderator/BatchAppenderatorDriverTest.java  |  5 +-
 .../StreamAppenderatorDriverFailTest.java          | 56 +++++++++++-----------
 .../appenderator/StreamAppenderatorDriverTest.java | 13 ++---
 10 files changed, 83 insertions(+), 66 deletions(-)

diff --git 
a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java
 
b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java
index 5f039c0..258e203 100644
--- 
a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java
+++ 
b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java
@@ -1126,7 +1126,7 @@ public class KafkaIndexTask extends AbstractTask 
implements ChatHandler
 
         log.info("Publishing with isTransaction[%s].", 
ioConfig.isUseTransaction());
 
-        return toolbox.getTaskActionClient().submit(action).isSuccess();
+        return toolbox.getTaskActionClient().submit(action);
       };
 
       // Supervised kafka tasks are killed by KafkaSupervisor if they are 
stuck during publishing segments or waiting
@@ -2332,7 +2332,7 @@ public class KafkaIndexTask extends AbstractTask 
implements ChatHandler
 
         log.info("Publishing with isTransaction[%s].", useTransaction);
 
-        return toolbox.getTaskActionClient().submit(action).isSuccess();
+        return toolbox.getTaskActionClient().submit(action);
       };
     }
   }
diff --git 
a/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java 
b/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java
index b9ec270..a13cf73 100644
--- 
a/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java
+++ 
b/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java
@@ -646,7 +646,7 @@ public class IndexTask extends AbstractTask
 
     final TransactionalSegmentPublisher publisher = (segments, commitMetadata) 
-> {
       final SegmentTransactionalInsertAction action = new 
SegmentTransactionalInsertAction(segments);
-      return toolbox.getTaskActionClient().submit(action).isSuccess();
+      return toolbox.getTaskActionClient().submit(action);
     };
 
     try (
diff --git 
a/server/src/main/java/io/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java
 
b/server/src/main/java/io/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java
index 17917d0..e2fccea 100644
--- 
a/server/src/main/java/io/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java
+++ 
b/server/src/main/java/io/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java
@@ -122,9 +122,12 @@ public interface IndexerMetadataStorageCoordinator
    *                      {@link DataSourceMetadata#plus(DataSourceMetadata)}. 
If null, this insert will not
    *                      involve a metadata transaction
    *
-   * @return segment publish result indicating transaction success or failure, 
and set of segments actually published
+   * @return segment publish result indicating transaction success or failure, 
and set of segments actually published.
+   * This method must only return a failure code if it is sure that the 
transaction did not happen. If it is not sure,
+   * it must throw an exception instead.
    *
    * @throws IllegalArgumentException if startMetadata and endMetadata are not 
either both null or both non-null
+   * @throws RuntimeException         if the state of metadata storage after 
this call is unknown
    */
   SegmentPublishResult announceHistoricalSegments(
       Set<DataSegment> segments,
diff --git 
a/server/src/main/java/io/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
 
b/server/src/main/java/io/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
index 746e359..3cf9157 100644
--- 
a/server/src/main/java/io/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
+++ 
b/server/src/main/java/io/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
@@ -326,7 +326,7 @@ public class IndexerSQLMetadataStorageCoordinator 
implements IndexerMetadataStor
       }
     }
 
-    final AtomicBoolean txnFailure = new AtomicBoolean(false);
+    final AtomicBoolean definitelyNotUpdated = new AtomicBoolean(false);
 
     try {
       return connector.retryTransaction(
@@ -338,6 +338,9 @@ public class IndexerSQLMetadataStorageCoordinator 
implements IndexerMetadataStor
                 final TransactionStatus transactionStatus
             ) throws Exception
             {
+              // Set definitelyNotUpdated back to false upon retrying.
+              definitelyNotUpdated.set(false);
+
               final Set<DataSegment> inserted = Sets.newHashSet();
 
               if (startMetadata != null) {
@@ -349,8 +352,9 @@ public class IndexerSQLMetadataStorageCoordinator 
implements IndexerMetadataStor
                 );
 
                 if (result != DataSourceMetadataUpdateResult.SUCCESS) {
+                  // Metadata was definitely not updated.
                   transactionStatus.setRollbackOnly();
-                  txnFailure.set(true);
+                  definitelyNotUpdated.set(true);
 
                   if (result == DataSourceMetadataUpdateResult.FAILURE) {
                     throw new RuntimeException("Aborting transaction!");
@@ -374,9 +378,10 @@ public class IndexerSQLMetadataStorageCoordinator 
implements IndexerMetadataStor
       );
     }
     catch (CallbackFailedException e) {
-      if (txnFailure.get()) {
-        return new SegmentPublishResult(ImmutableSet.<DataSegment>of(), false);
+      if (definitelyNotUpdated.get()) {
+        return SegmentPublishResult.fail();
       } else {
+        // Must throw exception if we are not sure if we updated or not.
         throw e;
       }
     }
@@ -904,7 +909,12 @@ public class IndexerSQLMetadataStorageCoordinator 
implements IndexerMetadataStor
    * @param endMetadata   dataSource metadata post-insert will have this 
endMetadata merged in with
    *                      {@link DataSourceMetadata#plus(DataSourceMetadata)}
    *
-   * @return true if dataSource metadata was updated from matching 
startMetadata to matching endMetadata
+   * @return SUCCESS if dataSource metadata was updated from matching 
startMetadata to matching endMetadata, FAILURE or
+   * TRY_AGAIN if it definitely was not updated. This guarantee is meant to 
help
+   * {@link #announceHistoricalSegments(Set, DataSourceMetadata, 
DataSourceMetadata)}
+   * achieve its own guarantee.
+   *
+   * @throws RuntimeException if state is unknown after this call
    */
   protected DataSourceMetadataUpdateResult updateDataSourceMetadataWithHandle(
       final Handle handle,
diff --git 
a/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorImpl.java
 
b/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorImpl.java
index 3f0f0ba..5b73d40 100644
--- 
a/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorImpl.java
+++ 
b/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorImpl.java
@@ -625,8 +625,15 @@ public class AppenderatorImpl implements Appenderator
     try {
       if (descriptorFile.exists()) {
         // Already pushed.
-        log.info("Segment[%s] already pushed.", identifier);
-        return objectMapper.readValue(descriptorFile, DataSegment.class);
+
+        if (useUniquePath) {
+          // Don't reuse the descriptor, because the caller asked for a unique 
path. Leave the old one as-is, since
+          // it might serve some unknown purpose.
+          log.info("Pushing segment[%s] again with new unique path.", 
identifier);
+        } else {
+          log.info("Segment[%s] already pushed.", identifier);
+          return objectMapper.readValue(descriptorFile, DataSegment.class);
+        }
       }
 
       log.info("Pushing merged index for segment[%s].", identifier);
diff --git 
a/server/src/main/java/io/druid/segment/realtime/appenderator/BaseAppenderatorDriver.java
 
b/server/src/main/java/io/druid/segment/realtime/appenderator/BaseAppenderatorDriver.java
index 1f04236..788c1c1 100644
--- 
a/server/src/main/java/io/druid/segment/realtime/appenderator/BaseAppenderatorDriver.java
+++ 
b/server/src/main/java/io/druid/segment/realtime/appenderator/BaseAppenderatorDriver.java
@@ -554,38 +554,33 @@ public abstract class BaseAppenderatorDriver implements 
Closeable
               final boolean published = publisher.publishSegments(
                   ImmutableSet.copyOf(segmentsAndMetadata.getSegments()),
                   metadata == null ? null : ((AppenderatorDriverMetadata) 
metadata).getCallerMetadata()
-              );
+              ).isSuccess();
 
               if (published) {
                 log.info("Published segments.");
               } else {
-                log.info("Transaction failure while publishing segments, 
checking if someone else beat us to it.");
+                log.info("Transaction failure while publishing segments, 
removing them from deep storage "
+                         + "and checking if someone else beat us to 
publishing.");
+
+                
segmentsAndMetadata.getSegments().forEach(dataSegmentKiller::killQuietly);
+
                 final Set<SegmentIdentifier> segmentsIdentifiers = 
segmentsAndMetadata
                     .getSegments()
                     .stream()
                     .map(SegmentIdentifier::fromDataSegment)
                     .collect(Collectors.toSet());
+
                 if (usedSegmentChecker.findUsedSegments(segmentsIdentifiers)
                                       
.equals(Sets.newHashSet(segmentsAndMetadata.getSegments()))) {
-                  log.info(
-                      "Removing our segments from deep storage because someone 
else already published them: %s",
-                      segmentsAndMetadata.getSegments()
-                  );
-                  
segmentsAndMetadata.getSegments().forEach(dataSegmentKiller::killQuietly);
-
                   log.info("Our segments really do exist, awaiting handoff.");
                 } else {
-                  throw new ISE("Failed to publish segments[%s]", 
segmentsAndMetadata.getSegments());
+                  throw new ISE("Failed to publish segments.");
                 }
               }
             }
             catch (Exception e) {
-              log.warn(
-                  "Removing segments from deep storage after failed publish: 
%s",
-                  segmentsAndMetadata.getSegments()
-              );
-              
segmentsAndMetadata.getSegments().forEach(dataSegmentKiller::killQuietly);
-
+              // Must not remove segments here, we aren't sure if our 
transaction succeeded or not.
+              log.warn(e, "Failed publish, not removing segments: %s", 
segmentsAndMetadata.getSegments());
               throw Throwables.propagate(e);
             }
           }
diff --git 
a/server/src/main/java/io/druid/segment/realtime/appenderator/TransactionalSegmentPublisher.java
 
b/server/src/main/java/io/druid/segment/realtime/appenderator/TransactionalSegmentPublisher.java
index 359708a..44326c1 100644
--- 
a/server/src/main/java/io/druid/segment/realtime/appenderator/TransactionalSegmentPublisher.java
+++ 
b/server/src/main/java/io/druid/segment/realtime/appenderator/TransactionalSegmentPublisher.java
@@ -19,6 +19,7 @@
 
 package io.druid.segment.realtime.appenderator;
 
+import io.druid.indexing.overlord.SegmentPublishResult;
 import io.druid.timeline.DataSegment;
 
 import javax.annotation.Nullable;
@@ -30,11 +31,14 @@ public interface TransactionalSegmentPublisher
   /**
    * Publish segments, along with some commit metadata, in a single 
transaction.
    *
-   * @return true if segments were published, false if they were not published 
due to txn failure with the metadata
+   * @return publish result that indicates if segments were published or not. 
If it is unclear
+   * if the segments were published or not, this method must throw an 
exception. The behavior is similar to
+   * IndexerSQLMetadataStorageCoordinator's announceHistoricalSegments.
    *
    * @throws IOException if there was an I/O error when publishing
+   * @throws RuntimeException if we cannot tell if the segments were published 
or not, for some other reason
    */
-  boolean publishSegments(
+  SegmentPublishResult publishSegments(
       Set<DataSegment> segments,
       @Nullable Object commitMetadata
   ) throws IOException;
diff --git 
a/server/src/test/java/io/druid/segment/realtime/appenderator/BatchAppenderatorDriverTest.java
 
b/server/src/test/java/io/druid/segment/realtime/appenderator/BatchAppenderatorDriverTest.java
index 2fd0f08..92d1ec1 100644
--- 
a/server/src/test/java/io/druid/segment/realtime/appenderator/BatchAppenderatorDriverTest.java
+++ 
b/server/src/test/java/io/druid/segment/realtime/appenderator/BatchAppenderatorDriverTest.java
@@ -24,13 +24,14 @@ import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
 import io.druid.data.input.InputRow;
 import io.druid.data.input.MapBasedInputRow;
+import io.druid.indexing.overlord.SegmentPublishResult;
 import io.druid.java.util.common.DateTimes;
 import io.druid.java.util.common.Intervals;
 import io.druid.java.util.common.granularity.Granularities;
 import io.druid.segment.loading.DataSegmentKiller;
 import 
io.druid.segment.realtime.appenderator.BaseAppenderatorDriver.SegmentsForSequence;
-import 
io.druid.segment.realtime.appenderator.StreamAppenderatorDriverTest.TestSegmentAllocator;
 import io.druid.segment.realtime.appenderator.SegmentWithState.SegmentState;
+import 
io.druid.segment.realtime.appenderator.StreamAppenderatorDriverTest.TestSegmentAllocator;
 import io.druid.timeline.partition.NumberedShardSpec;
 import org.easymock.EasyMock;
 import org.easymock.EasyMockSupport;
@@ -194,6 +195,6 @@ public class BatchAppenderatorDriverTest extends 
EasyMockSupport
 
   static TransactionalSegmentPublisher makeOkPublisher()
   {
-    return (segments, commitMetadata) -> true;
+    return (segments, commitMetadata) -> new 
SegmentPublishResult(ImmutableSet.of(), true);
   }
 }
diff --git 
a/server/src/test/java/io/druid/segment/realtime/appenderator/StreamAppenderatorDriverFailTest.java
 
b/server/src/test/java/io/druid/segment/realtime/appenderator/StreamAppenderatorDriverFailTest.java
index 9535bb3..e9f20ff 100644
--- 
a/server/src/test/java/io/druid/segment/realtime/appenderator/StreamAppenderatorDriverFailTest.java
+++ 
b/server/src/test/java/io/druid/segment/realtime/appenderator/StreamAppenderatorDriverFailTest.java
@@ -239,8 +239,7 @@ public class StreamAppenderatorDriverFailTest extends 
EasyMockSupport
   {
     expectedException.expect(ExecutionException.class);
     expectedException.expectCause(CoreMatchers.instanceOf(ISE.class));
-    expectedException.expectMessage(
-        "Failed to publish segments[[DataSegment{size=0, 
shardSpec=NumberedShardSpec{partitionNum=0, partitions=0}, metrics=[], 
dimensions=[], version='abc123', loadSpec={}, 
interval=2000-01-01T00:00:00.000Z/2000-01-01T01:00:00.000Z, dataSource='foo', 
binaryVersion='0'}, DataSegment{size=0, 
shardSpec=NumberedShardSpec{partitionNum=0, partitions=0}, metrics=[], 
dimensions=[], version='abc123', loadSpec={}, 
interval=2000-01-01T01:00:00.000Z/2000-01-01T02:00:00.000Z, dataSource='foo', 
binar [...]
+    expectedException.expectMessage("Failed to publish segments.");
 
     testFailDuringPublishInternal(false);
   }
@@ -279,31 +278,34 @@ public class StreamAppenderatorDriverFailTest extends 
EasyMockSupport
       Assert.assertTrue(driver.add(ROWS.get(i), "dummy", committerSupplier, 
false, true).isOk());
     }
 
-    dataSegmentKiller.killQuietly(new DataSegment(
-        "foo",
-        Intervals.of("2000-01-01T00:00:00.000Z/2000-01-01T01:00:00.000Z"),
-        "abc123",
-        ImmutableMap.of(),
-        ImmutableList.of(),
-        ImmutableList.of(),
-        new NumberedShardSpec(0, 0),
-        0,
-        0
-    ));
-    EasyMock.expectLastCall().once();
-
-    dataSegmentKiller.killQuietly(new DataSegment(
-        "foo",
-        Intervals.of("2000-01-01T01:00:00.000Z/2000-01-01T02:00:00.000Z"),
-        "abc123",
-        ImmutableMap.of(),
-        ImmutableList.of(),
-        ImmutableList.of(),
-        new NumberedShardSpec(0, 0),
-        0,
-        0
-    ));
-    EasyMock.expectLastCall().once();
+    if (!failWithException) {
+      // Should only kill segments if there was _no_ exception.
+      dataSegmentKiller.killQuietly(new DataSegment(
+          "foo",
+          Intervals.of("2000-01-01T00:00:00.000Z/2000-01-01T01:00:00.000Z"),
+          "abc123",
+          ImmutableMap.of(),
+          ImmutableList.of(),
+          ImmutableList.of(),
+          new NumberedShardSpec(0, 0),
+          0,
+          0
+      ));
+      EasyMock.expectLastCall().once();
+
+      dataSegmentKiller.killQuietly(new DataSegment(
+          "foo",
+          Intervals.of("2000-01-01T01:00:00.000Z/2000-01-01T02:00:00.000Z"),
+          "abc123",
+          ImmutableMap.of(),
+          ImmutableList.of(),
+          ImmutableList.of(),
+          new NumberedShardSpec(0, 0),
+          0,
+          0
+      ));
+      EasyMock.expectLastCall().once();
+    }
 
     EasyMock.replay(dataSegmentKiller);
 
diff --git 
a/server/src/test/java/io/druid/segment/realtime/appenderator/StreamAppenderatorDriverTest.java
 
b/server/src/test/java/io/druid/segment/realtime/appenderator/StreamAppenderatorDriverTest.java
index aff1e02..763ed0e 100644
--- 
a/server/src/test/java/io/druid/segment/realtime/appenderator/StreamAppenderatorDriverTest.java
+++ 
b/server/src/test/java/io/druid/segment/realtime/appenderator/StreamAppenderatorDriverTest.java
@@ -30,6 +30,7 @@ import com.google.common.util.concurrent.ListenableFuture;
 import io.druid.data.input.Committer;
 import io.druid.data.input.InputRow;
 import io.druid.data.input.MapBasedInputRow;
+import io.druid.indexing.overlord.SegmentPublishResult;
 import io.druid.jackson.DefaultObjectMapper;
 import io.druid.java.util.common.DateTimes;
 import io.druid.java.util.common.Intervals;
@@ -53,6 +54,7 @@ import org.junit.Test;
 
 import java.io.IOException;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -359,14 +361,7 @@ public class StreamAppenderatorDriverTest extends 
EasyMockSupport
 
   static TransactionalSegmentPublisher makeOkPublisher()
   {
-    return new TransactionalSegmentPublisher()
-    {
-      @Override
-      public boolean publishSegments(Set<DataSegment> segments, Object 
commitMetadata) throws IOException
-      {
-        return true;
-      }
-    };
+    return (segments, commitMetadata) -> new 
SegmentPublishResult(Collections.emptySet(), true);
   }
 
   static TransactionalSegmentPublisher makeFailingPublisher(boolean 
failWithException)
@@ -375,7 +370,7 @@ public class StreamAppenderatorDriverTest extends 
EasyMockSupport
       if (failWithException) {
         throw new RuntimeException("test");
       }
-      return false;
+      return SegmentPublishResult.fail();
     };
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org

Reply via email to