This is an automated email from the ASF dual-hosted git repository. jonwei pushed a commit to branch 0.12.3 in repository https://gitbox.apache.org/repos/asf/incubator-druid.git
The following commit(s) were added to refs/heads/0.12.3 by this push: new 8b1c8e4 [Backport] Fix three bugs with segment publishing. (#6155) (#6187) 8b1c8e4 is described below commit 8b1c8e4c1552646b5723e5e443edff6043d65cbe Author: Jonathan Wei <jon-...@users.noreply.github.com> AuthorDate: Fri Aug 17 18:58:27 2018 -0700 [Backport] Fix three bugs with segment publishing. (#6155) (#6187) * [Backport] Fix three bugs with segment publishing. (#6155) * Fix KafkaIndexTask --- .../io/druid/indexing/kafka/KafkaIndexTask.java | 4 +- .../io/druid/indexing/common/task/IndexTask.java | 2 +- .../IndexerMetadataStorageCoordinator.java | 5 +- .../IndexerSQLMetadataStorageCoordinator.java | 20 ++++++-- .../realtime/appenderator/AppenderatorImpl.java | 11 ++++- .../appenderator/BaseAppenderatorDriver.java | 25 ++++------ .../TransactionalSegmentPublisher.java | 8 +++- .../appenderator/BatchAppenderatorDriverTest.java | 5 +- .../StreamAppenderatorDriverFailTest.java | 56 +++++++++++----------- .../appenderator/StreamAppenderatorDriverTest.java | 13 ++--- 10 files changed, 83 insertions(+), 66 deletions(-) diff --git a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java index 5f039c0..258e203 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java +++ b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java @@ -1126,7 +1126,7 @@ public class KafkaIndexTask extends AbstractTask implements ChatHandler log.info("Publishing with isTransaction[%s].", ioConfig.isUseTransaction()); - return toolbox.getTaskActionClient().submit(action).isSuccess(); + return toolbox.getTaskActionClient().submit(action); }; // Supervised kafka tasks are killed by KafkaSupervisor if they are stuck during publishing segments or waiting @@ -2332,7 +2332,7 @@ public class KafkaIndexTask extends AbstractTask implements ChatHandler log.info("Publishing with isTransaction[%s].", useTransaction); - return toolbox.getTaskActionClient().submit(action).isSuccess(); + return toolbox.getTaskActionClient().submit(action); }; } } diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java index b9ec270..a13cf73 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java @@ -646,7 +646,7 @@ public class IndexTask extends AbstractTask final TransactionalSegmentPublisher publisher = (segments, commitMetadata) -> { final SegmentTransactionalInsertAction action = new SegmentTransactionalInsertAction(segments); - return toolbox.getTaskActionClient().submit(action).isSuccess(); + return toolbox.getTaskActionClient().submit(action); }; try ( diff --git a/server/src/main/java/io/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java b/server/src/main/java/io/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java index 17917d0..e2fccea 100644 --- a/server/src/main/java/io/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java +++ b/server/src/main/java/io/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java @@ -122,9 +122,12 @@ public interface IndexerMetadataStorageCoordinator * {@link DataSourceMetadata#plus(DataSourceMetadata)}. If null, this insert will not * involve a metadata transaction * - * @return segment publish result indicating transaction success or failure, and set of segments actually published + * @return segment publish result indicating transaction success or failure, and set of segments actually published. + * This method must only return a failure code if it is sure that the transaction did not happen. If it is not sure, + * it must throw an exception instead. * * @throws IllegalArgumentException if startMetadata and endMetadata are not either both null or both non-null + * @throws RuntimeException if the state of metadata storage after this call is unknown */ SegmentPublishResult announceHistoricalSegments( Set<DataSegment> segments, diff --git a/server/src/main/java/io/druid/metadata/IndexerSQLMetadataStorageCoordinator.java b/server/src/main/java/io/druid/metadata/IndexerSQLMetadataStorageCoordinator.java index 746e359..3cf9157 100644 --- a/server/src/main/java/io/druid/metadata/IndexerSQLMetadataStorageCoordinator.java +++ b/server/src/main/java/io/druid/metadata/IndexerSQLMetadataStorageCoordinator.java @@ -326,7 +326,7 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor } } - final AtomicBoolean txnFailure = new AtomicBoolean(false); + final AtomicBoolean definitelyNotUpdated = new AtomicBoolean(false); try { return connector.retryTransaction( @@ -338,6 +338,9 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor final TransactionStatus transactionStatus ) throws Exception { + // Set definitelyNotUpdated back to false upon retrying. + definitelyNotUpdated.set(false); + final Set<DataSegment> inserted = Sets.newHashSet(); if (startMetadata != null) { @@ -349,8 +352,9 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor ); if (result != DataSourceMetadataUpdateResult.SUCCESS) { + // Metadata was definitely not updated. transactionStatus.setRollbackOnly(); - txnFailure.set(true); + definitelyNotUpdated.set(true); if (result == DataSourceMetadataUpdateResult.FAILURE) { throw new RuntimeException("Aborting transaction!"); @@ -374,9 +378,10 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor ); } catch (CallbackFailedException e) { - if (txnFailure.get()) { - return new SegmentPublishResult(ImmutableSet.<DataSegment>of(), false); + if (definitelyNotUpdated.get()) { + return SegmentPublishResult.fail(); } else { + // Must throw exception if we are not sure if we updated or not. throw e; } } @@ -904,7 +909,12 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor * @param endMetadata dataSource metadata post-insert will have this endMetadata merged in with * {@link DataSourceMetadata#plus(DataSourceMetadata)} * - * @return true if dataSource metadata was updated from matching startMetadata to matching endMetadata + * @return SUCCESS if dataSource metadata was updated from matching startMetadata to matching endMetadata, FAILURE or + * TRY_AGAIN if it definitely was not updated. This guarantee is meant to help + * {@link #announceHistoricalSegments(Set, DataSourceMetadata, DataSourceMetadata)} + * achieve its own guarantee. + * + * @throws RuntimeException if state is unknown after this call */ protected DataSourceMetadataUpdateResult updateDataSourceMetadataWithHandle( final Handle handle, diff --git a/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorImpl.java b/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorImpl.java index 3f0f0ba..5b73d40 100644 --- a/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorImpl.java +++ b/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorImpl.java @@ -625,8 +625,15 @@ public class AppenderatorImpl implements Appenderator try { if (descriptorFile.exists()) { // Already pushed. - log.info("Segment[%s] already pushed.", identifier); - return objectMapper.readValue(descriptorFile, DataSegment.class); + + if (useUniquePath) { + // Don't reuse the descriptor, because the caller asked for a unique path. Leave the old one as-is, since + // it might serve some unknown purpose. + log.info("Pushing segment[%s] again with new unique path.", identifier); + } else { + log.info("Segment[%s] already pushed.", identifier); + return objectMapper.readValue(descriptorFile, DataSegment.class); + } } log.info("Pushing merged index for segment[%s].", identifier); diff --git a/server/src/main/java/io/druid/segment/realtime/appenderator/BaseAppenderatorDriver.java b/server/src/main/java/io/druid/segment/realtime/appenderator/BaseAppenderatorDriver.java index 1f04236..788c1c1 100644 --- a/server/src/main/java/io/druid/segment/realtime/appenderator/BaseAppenderatorDriver.java +++ b/server/src/main/java/io/druid/segment/realtime/appenderator/BaseAppenderatorDriver.java @@ -554,38 +554,33 @@ public abstract class BaseAppenderatorDriver implements Closeable final boolean published = publisher.publishSegments( ImmutableSet.copyOf(segmentsAndMetadata.getSegments()), metadata == null ? null : ((AppenderatorDriverMetadata) metadata).getCallerMetadata() - ); + ).isSuccess(); if (published) { log.info("Published segments."); } else { - log.info("Transaction failure while publishing segments, checking if someone else beat us to it."); + log.info("Transaction failure while publishing segments, removing them from deep storage " + + "and checking if someone else beat us to publishing."); + + segmentsAndMetadata.getSegments().forEach(dataSegmentKiller::killQuietly); + final Set<SegmentIdentifier> segmentsIdentifiers = segmentsAndMetadata .getSegments() .stream() .map(SegmentIdentifier::fromDataSegment) .collect(Collectors.toSet()); + if (usedSegmentChecker.findUsedSegments(segmentsIdentifiers) .equals(Sets.newHashSet(segmentsAndMetadata.getSegments()))) { - log.info( - "Removing our segments from deep storage because someone else already published them: %s", - segmentsAndMetadata.getSegments() - ); - segmentsAndMetadata.getSegments().forEach(dataSegmentKiller::killQuietly); - log.info("Our segments really do exist, awaiting handoff."); } else { - throw new ISE("Failed to publish segments[%s]", segmentsAndMetadata.getSegments()); + throw new ISE("Failed to publish segments."); } } } catch (Exception e) { - log.warn( - "Removing segments from deep storage after failed publish: %s", - segmentsAndMetadata.getSegments() - ); - segmentsAndMetadata.getSegments().forEach(dataSegmentKiller::killQuietly); - + // Must not remove segments here, we aren't sure if our transaction succeeded or not. + log.warn(e, "Failed publish, not removing segments: %s", segmentsAndMetadata.getSegments()); throw Throwables.propagate(e); } } diff --git a/server/src/main/java/io/druid/segment/realtime/appenderator/TransactionalSegmentPublisher.java b/server/src/main/java/io/druid/segment/realtime/appenderator/TransactionalSegmentPublisher.java index 359708a..44326c1 100644 --- a/server/src/main/java/io/druid/segment/realtime/appenderator/TransactionalSegmentPublisher.java +++ b/server/src/main/java/io/druid/segment/realtime/appenderator/TransactionalSegmentPublisher.java @@ -19,6 +19,7 @@ package io.druid.segment.realtime.appenderator; +import io.druid.indexing.overlord.SegmentPublishResult; import io.druid.timeline.DataSegment; import javax.annotation.Nullable; @@ -30,11 +31,14 @@ public interface TransactionalSegmentPublisher /** * Publish segments, along with some commit metadata, in a single transaction. * - * @return true if segments were published, false if they were not published due to txn failure with the metadata + * @return publish result that indicates if segments were published or not. If it is unclear + * if the segments were published or not, this method must throw an exception. The behavior is similar to + * IndexerSQLMetadataStorageCoordinator's announceHistoricalSegments. * * @throws IOException if there was an I/O error when publishing + * @throws RuntimeException if we cannot tell if the segments were published or not, for some other reason */ - boolean publishSegments( + SegmentPublishResult publishSegments( Set<DataSegment> segments, @Nullable Object commitMetadata ) throws IOException; diff --git a/server/src/test/java/io/druid/segment/realtime/appenderator/BatchAppenderatorDriverTest.java b/server/src/test/java/io/druid/segment/realtime/appenderator/BatchAppenderatorDriverTest.java index 2fd0f08..92d1ec1 100644 --- a/server/src/test/java/io/druid/segment/realtime/appenderator/BatchAppenderatorDriverTest.java +++ b/server/src/test/java/io/druid/segment/realtime/appenderator/BatchAppenderatorDriverTest.java @@ -24,13 +24,14 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import io.druid.data.input.InputRow; import io.druid.data.input.MapBasedInputRow; +import io.druid.indexing.overlord.SegmentPublishResult; import io.druid.java.util.common.DateTimes; import io.druid.java.util.common.Intervals; import io.druid.java.util.common.granularity.Granularities; import io.druid.segment.loading.DataSegmentKiller; import io.druid.segment.realtime.appenderator.BaseAppenderatorDriver.SegmentsForSequence; -import io.druid.segment.realtime.appenderator.StreamAppenderatorDriverTest.TestSegmentAllocator; import io.druid.segment.realtime.appenderator.SegmentWithState.SegmentState; +import io.druid.segment.realtime.appenderator.StreamAppenderatorDriverTest.TestSegmentAllocator; import io.druid.timeline.partition.NumberedShardSpec; import org.easymock.EasyMock; import org.easymock.EasyMockSupport; @@ -194,6 +195,6 @@ public class BatchAppenderatorDriverTest extends EasyMockSupport static TransactionalSegmentPublisher makeOkPublisher() { - return (segments, commitMetadata) -> true; + return (segments, commitMetadata) -> new SegmentPublishResult(ImmutableSet.of(), true); } } diff --git a/server/src/test/java/io/druid/segment/realtime/appenderator/StreamAppenderatorDriverFailTest.java b/server/src/test/java/io/druid/segment/realtime/appenderator/StreamAppenderatorDriverFailTest.java index 9535bb3..e9f20ff 100644 --- a/server/src/test/java/io/druid/segment/realtime/appenderator/StreamAppenderatorDriverFailTest.java +++ b/server/src/test/java/io/druid/segment/realtime/appenderator/StreamAppenderatorDriverFailTest.java @@ -239,8 +239,7 @@ public class StreamAppenderatorDriverFailTest extends EasyMockSupport { expectedException.expect(ExecutionException.class); expectedException.expectCause(CoreMatchers.instanceOf(ISE.class)); - expectedException.expectMessage( - "Failed to publish segments[[DataSegment{size=0, shardSpec=NumberedShardSpec{partitionNum=0, partitions=0}, metrics=[], dimensions=[], version='abc123', loadSpec={}, interval=2000-01-01T00:00:00.000Z/2000-01-01T01:00:00.000Z, dataSource='foo', binaryVersion='0'}, DataSegment{size=0, shardSpec=NumberedShardSpec{partitionNum=0, partitions=0}, metrics=[], dimensions=[], version='abc123', loadSpec={}, interval=2000-01-01T01:00:00.000Z/2000-01-01T02:00:00.000Z, dataSource='foo', binar [...] + expectedException.expectMessage("Failed to publish segments."); testFailDuringPublishInternal(false); } @@ -279,31 +278,34 @@ public class StreamAppenderatorDriverFailTest extends EasyMockSupport Assert.assertTrue(driver.add(ROWS.get(i), "dummy", committerSupplier, false, true).isOk()); } - dataSegmentKiller.killQuietly(new DataSegment( - "foo", - Intervals.of("2000-01-01T00:00:00.000Z/2000-01-01T01:00:00.000Z"), - "abc123", - ImmutableMap.of(), - ImmutableList.of(), - ImmutableList.of(), - new NumberedShardSpec(0, 0), - 0, - 0 - )); - EasyMock.expectLastCall().once(); - - dataSegmentKiller.killQuietly(new DataSegment( - "foo", - Intervals.of("2000-01-01T01:00:00.000Z/2000-01-01T02:00:00.000Z"), - "abc123", - ImmutableMap.of(), - ImmutableList.of(), - ImmutableList.of(), - new NumberedShardSpec(0, 0), - 0, - 0 - )); - EasyMock.expectLastCall().once(); + if (!failWithException) { + // Should only kill segments if there was _no_ exception. + dataSegmentKiller.killQuietly(new DataSegment( + "foo", + Intervals.of("2000-01-01T00:00:00.000Z/2000-01-01T01:00:00.000Z"), + "abc123", + ImmutableMap.of(), + ImmutableList.of(), + ImmutableList.of(), + new NumberedShardSpec(0, 0), + 0, + 0 + )); + EasyMock.expectLastCall().once(); + + dataSegmentKiller.killQuietly(new DataSegment( + "foo", + Intervals.of("2000-01-01T01:00:00.000Z/2000-01-01T02:00:00.000Z"), + "abc123", + ImmutableMap.of(), + ImmutableList.of(), + ImmutableList.of(), + new NumberedShardSpec(0, 0), + 0, + 0 + )); + EasyMock.expectLastCall().once(); + } EasyMock.replay(dataSegmentKiller); diff --git a/server/src/test/java/io/druid/segment/realtime/appenderator/StreamAppenderatorDriverTest.java b/server/src/test/java/io/druid/segment/realtime/appenderator/StreamAppenderatorDriverTest.java index aff1e02..763ed0e 100644 --- a/server/src/test/java/io/druid/segment/realtime/appenderator/StreamAppenderatorDriverTest.java +++ b/server/src/test/java/io/druid/segment/realtime/appenderator/StreamAppenderatorDriverTest.java @@ -30,6 +30,7 @@ import com.google.common.util.concurrent.ListenableFuture; import io.druid.data.input.Committer; import io.druid.data.input.InputRow; import io.druid.data.input.MapBasedInputRow; +import io.druid.indexing.overlord.SegmentPublishResult; import io.druid.jackson.DefaultObjectMapper; import io.druid.java.util.common.DateTimes; import io.druid.java.util.common.Intervals; @@ -53,6 +54,7 @@ import org.junit.Test; import java.io.IOException; import java.util.Arrays; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Set; @@ -359,14 +361,7 @@ public class StreamAppenderatorDriverTest extends EasyMockSupport static TransactionalSegmentPublisher makeOkPublisher() { - return new TransactionalSegmentPublisher() - { - @Override - public boolean publishSegments(Set<DataSegment> segments, Object commitMetadata) throws IOException - { - return true; - } - }; + return (segments, commitMetadata) -> new SegmentPublishResult(Collections.emptySet(), true); } static TransactionalSegmentPublisher makeFailingPublisher(boolean failWithException) @@ -375,7 +370,7 @@ public class StreamAppenderatorDriverTest extends EasyMockSupport if (failWithException) { throw new RuntimeException("test"); } - return false; + return SegmentPublishResult.fail(); }; } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For additional commands, e-mail: commits-h...@druid.apache.org