This is an automated email from the ASF dual-hosted git repository.
jonwei pushed a commit to branch 0.12.3
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git
The following commit(s) were added to refs/heads/0.12.3 by this push:
new 8b1c8e4 [Backport] Fix three bugs with segment publishing. (#6155)
(#6187)
8b1c8e4 is described below
commit 8b1c8e4c1552646b5723e5e443edff6043d65cbe
Author: Jonathan Wei <[email protected]>
AuthorDate: Fri Aug 17 18:58:27 2018 -0700
[Backport] Fix three bugs with segment publishing. (#6155) (#6187)
* [Backport] Fix three bugs with segment publishing. (#6155)
* Fix KafkaIndexTask
---
.../io/druid/indexing/kafka/KafkaIndexTask.java | 4 +-
.../io/druid/indexing/common/task/IndexTask.java | 2 +-
.../IndexerMetadataStorageCoordinator.java | 5 +-
.../IndexerSQLMetadataStorageCoordinator.java | 20 ++++++--
.../realtime/appenderator/AppenderatorImpl.java | 11 ++++-
.../appenderator/BaseAppenderatorDriver.java | 25 ++++------
.../TransactionalSegmentPublisher.java | 8 +++-
.../appenderator/BatchAppenderatorDriverTest.java | 5 +-
.../StreamAppenderatorDriverFailTest.java | 56 +++++++++++-----------
.../appenderator/StreamAppenderatorDriverTest.java | 13 ++---
10 files changed, 83 insertions(+), 66 deletions(-)
diff --git
a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java
b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java
index 5f039c0..258e203 100644
---
a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java
+++
b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java
@@ -1126,7 +1126,7 @@ public class KafkaIndexTask extends AbstractTask
implements ChatHandler
log.info("Publishing with isTransaction[%s].",
ioConfig.isUseTransaction());
- return toolbox.getTaskActionClient().submit(action).isSuccess();
+ return toolbox.getTaskActionClient().submit(action);
};
// Supervised kafka tasks are killed by KafkaSupervisor if they are
stuck during publishing segments or waiting
@@ -2332,7 +2332,7 @@ public class KafkaIndexTask extends AbstractTask
implements ChatHandler
log.info("Publishing with isTransaction[%s].", useTransaction);
- return toolbox.getTaskActionClient().submit(action).isSuccess();
+ return toolbox.getTaskActionClient().submit(action);
};
}
}
diff --git
a/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java
b/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java
index b9ec270..a13cf73 100644
---
a/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java
+++
b/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java
@@ -646,7 +646,7 @@ public class IndexTask extends AbstractTask
final TransactionalSegmentPublisher publisher = (segments, commitMetadata)
-> {
final SegmentTransactionalInsertAction action = new
SegmentTransactionalInsertAction(segments);
- return toolbox.getTaskActionClient().submit(action).isSuccess();
+ return toolbox.getTaskActionClient().submit(action);
};
try (
diff --git
a/server/src/main/java/io/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java
b/server/src/main/java/io/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java
index 17917d0..e2fccea 100644
---
a/server/src/main/java/io/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java
+++
b/server/src/main/java/io/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java
@@ -122,9 +122,12 @@ public interface IndexerMetadataStorageCoordinator
* {@link DataSourceMetadata#plus(DataSourceMetadata)}.
If null, this insert will not
* involve a metadata transaction
*
- * @return segment publish result indicating transaction success or failure,
and set of segments actually published
+ * @return segment publish result indicating transaction success or failure,
and set of segments actually published.
+ * This method must only return a failure code if it is sure that the
transaction did not happen. If it is not sure,
+ * it must throw an exception instead.
*
* @throws IllegalArgumentException if startMetadata and endMetadata are not
either both null or both non-null
+ * @throws RuntimeException if the state of metadata storage after
this call is unknown
*/
SegmentPublishResult announceHistoricalSegments(
Set<DataSegment> segments,
diff --git
a/server/src/main/java/io/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
b/server/src/main/java/io/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
index 746e359..3cf9157 100644
---
a/server/src/main/java/io/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
+++
b/server/src/main/java/io/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
@@ -326,7 +326,7 @@ public class IndexerSQLMetadataStorageCoordinator
implements IndexerMetadataStor
}
}
- final AtomicBoolean txnFailure = new AtomicBoolean(false);
+ final AtomicBoolean definitelyNotUpdated = new AtomicBoolean(false);
try {
return connector.retryTransaction(
@@ -338,6 +338,9 @@ public class IndexerSQLMetadataStorageCoordinator
implements IndexerMetadataStor
final TransactionStatus transactionStatus
) throws Exception
{
+ // Set definitelyNotUpdated back to false upon retrying.
+ definitelyNotUpdated.set(false);
+
final Set<DataSegment> inserted = Sets.newHashSet();
if (startMetadata != null) {
@@ -349,8 +352,9 @@ public class IndexerSQLMetadataStorageCoordinator
implements IndexerMetadataStor
);
if (result != DataSourceMetadataUpdateResult.SUCCESS) {
+ // Metadata was definitely not updated.
transactionStatus.setRollbackOnly();
- txnFailure.set(true);
+ definitelyNotUpdated.set(true);
if (result == DataSourceMetadataUpdateResult.FAILURE) {
throw new RuntimeException("Aborting transaction!");
@@ -374,9 +378,10 @@ public class IndexerSQLMetadataStorageCoordinator
implements IndexerMetadataStor
);
}
catch (CallbackFailedException e) {
- if (txnFailure.get()) {
- return new SegmentPublishResult(ImmutableSet.<DataSegment>of(), false);
+ if (definitelyNotUpdated.get()) {
+ return SegmentPublishResult.fail();
} else {
+ // Must throw exception if we are not sure if we updated or not.
throw e;
}
}
@@ -904,7 +909,12 @@ public class IndexerSQLMetadataStorageCoordinator
implements IndexerMetadataStor
* @param endMetadata dataSource metadata post-insert will have this
endMetadata merged in with
* {@link DataSourceMetadata#plus(DataSourceMetadata)}
*
- * @return true if dataSource metadata was updated from matching
startMetadata to matching endMetadata
+ * @return SUCCESS if dataSource metadata was updated from matching
startMetadata to matching endMetadata, FAILURE or
+ * TRY_AGAIN if it definitely was not updated. This guarantee is meant to
help
+ * {@link #announceHistoricalSegments(Set, DataSourceMetadata,
DataSourceMetadata)}
+ * achieve its own guarantee.
+ *
+ * @throws RuntimeException if state is unknown after this call
*/
protected DataSourceMetadataUpdateResult updateDataSourceMetadataWithHandle(
final Handle handle,
diff --git
a/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorImpl.java
b/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorImpl.java
index 3f0f0ba..5b73d40 100644
---
a/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorImpl.java
+++
b/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorImpl.java
@@ -625,8 +625,15 @@ public class AppenderatorImpl implements Appenderator
try {
if (descriptorFile.exists()) {
// Already pushed.
- log.info("Segment[%s] already pushed.", identifier);
- return objectMapper.readValue(descriptorFile, DataSegment.class);
+
+ if (useUniquePath) {
+ // Don't reuse the descriptor, because the caller asked for a unique
path. Leave the old one as-is, since
+ // it might serve some unknown purpose.
+ log.info("Pushing segment[%s] again with new unique path.",
identifier);
+ } else {
+ log.info("Segment[%s] already pushed.", identifier);
+ return objectMapper.readValue(descriptorFile, DataSegment.class);
+ }
}
log.info("Pushing merged index for segment[%s].", identifier);
diff --git
a/server/src/main/java/io/druid/segment/realtime/appenderator/BaseAppenderatorDriver.java
b/server/src/main/java/io/druid/segment/realtime/appenderator/BaseAppenderatorDriver.java
index 1f04236..788c1c1 100644
---
a/server/src/main/java/io/druid/segment/realtime/appenderator/BaseAppenderatorDriver.java
+++
b/server/src/main/java/io/druid/segment/realtime/appenderator/BaseAppenderatorDriver.java
@@ -554,38 +554,33 @@ public abstract class BaseAppenderatorDriver implements
Closeable
final boolean published = publisher.publishSegments(
ImmutableSet.copyOf(segmentsAndMetadata.getSegments()),
metadata == null ? null : ((AppenderatorDriverMetadata)
metadata).getCallerMetadata()
- );
+ ).isSuccess();
if (published) {
log.info("Published segments.");
} else {
- log.info("Transaction failure while publishing segments,
checking if someone else beat us to it.");
+ log.info("Transaction failure while publishing segments,
removing them from deep storage "
+ + "and checking if someone else beat us to
publishing.");
+
+
segmentsAndMetadata.getSegments().forEach(dataSegmentKiller::killQuietly);
+
final Set<SegmentIdentifier> segmentsIdentifiers =
segmentsAndMetadata
.getSegments()
.stream()
.map(SegmentIdentifier::fromDataSegment)
.collect(Collectors.toSet());
+
if (usedSegmentChecker.findUsedSegments(segmentsIdentifiers)
.equals(Sets.newHashSet(segmentsAndMetadata.getSegments()))) {
- log.info(
- "Removing our segments from deep storage because someone
else already published them: %s",
- segmentsAndMetadata.getSegments()
- );
-
segmentsAndMetadata.getSegments().forEach(dataSegmentKiller::killQuietly);
-
log.info("Our segments really do exist, awaiting handoff.");
} else {
- throw new ISE("Failed to publish segments[%s]",
segmentsAndMetadata.getSegments());
+ throw new ISE("Failed to publish segments.");
}
}
}
catch (Exception e) {
- log.warn(
- "Removing segments from deep storage after failed publish:
%s",
- segmentsAndMetadata.getSegments()
- );
-
segmentsAndMetadata.getSegments().forEach(dataSegmentKiller::killQuietly);
-
+ // Must not remove segments here, we aren't sure if our
transaction succeeded or not.
+ log.warn(e, "Failed publish, not removing segments: %s",
segmentsAndMetadata.getSegments());
throw Throwables.propagate(e);
}
}
diff --git
a/server/src/main/java/io/druid/segment/realtime/appenderator/TransactionalSegmentPublisher.java
b/server/src/main/java/io/druid/segment/realtime/appenderator/TransactionalSegmentPublisher.java
index 359708a..44326c1 100644
---
a/server/src/main/java/io/druid/segment/realtime/appenderator/TransactionalSegmentPublisher.java
+++
b/server/src/main/java/io/druid/segment/realtime/appenderator/TransactionalSegmentPublisher.java
@@ -19,6 +19,7 @@
package io.druid.segment.realtime.appenderator;
+import io.druid.indexing.overlord.SegmentPublishResult;
import io.druid.timeline.DataSegment;
import javax.annotation.Nullable;
@@ -30,11 +31,14 @@ public interface TransactionalSegmentPublisher
/**
* Publish segments, along with some commit metadata, in a single
transaction.
*
- * @return true if segments were published, false if they were not published
due to txn failure with the metadata
+ * @return publish result that indicates if segments were published or not.
If it is unclear
+ * if the segments were published or not, this method must throw an
exception. The behavior is similar to
+ * IndexerSQLMetadataStorageCoordinator's announceHistoricalSegments.
*
* @throws IOException if there was an I/O error when publishing
+ * @throws RuntimeException if we cannot tell if the segments were published
or not, for some other reason
*/
- boolean publishSegments(
+ SegmentPublishResult publishSegments(
Set<DataSegment> segments,
@Nullable Object commitMetadata
) throws IOException;
diff --git
a/server/src/test/java/io/druid/segment/realtime/appenderator/BatchAppenderatorDriverTest.java
b/server/src/test/java/io/druid/segment/realtime/appenderator/BatchAppenderatorDriverTest.java
index 2fd0f08..92d1ec1 100644
---
a/server/src/test/java/io/druid/segment/realtime/appenderator/BatchAppenderatorDriverTest.java
+++
b/server/src/test/java/io/druid/segment/realtime/appenderator/BatchAppenderatorDriverTest.java
@@ -24,13 +24,14 @@ import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import io.druid.data.input.InputRow;
import io.druid.data.input.MapBasedInputRow;
+import io.druid.indexing.overlord.SegmentPublishResult;
import io.druid.java.util.common.DateTimes;
import io.druid.java.util.common.Intervals;
import io.druid.java.util.common.granularity.Granularities;
import io.druid.segment.loading.DataSegmentKiller;
import
io.druid.segment.realtime.appenderator.BaseAppenderatorDriver.SegmentsForSequence;
-import
io.druid.segment.realtime.appenderator.StreamAppenderatorDriverTest.TestSegmentAllocator;
import io.druid.segment.realtime.appenderator.SegmentWithState.SegmentState;
+import
io.druid.segment.realtime.appenderator.StreamAppenderatorDriverTest.TestSegmentAllocator;
import io.druid.timeline.partition.NumberedShardSpec;
import org.easymock.EasyMock;
import org.easymock.EasyMockSupport;
@@ -194,6 +195,6 @@ public class BatchAppenderatorDriverTest extends
EasyMockSupport
static TransactionalSegmentPublisher makeOkPublisher()
{
- return (segments, commitMetadata) -> true;
+ return (segments, commitMetadata) -> new
SegmentPublishResult(ImmutableSet.of(), true);
}
}
diff --git
a/server/src/test/java/io/druid/segment/realtime/appenderator/StreamAppenderatorDriverFailTest.java
b/server/src/test/java/io/druid/segment/realtime/appenderator/StreamAppenderatorDriverFailTest.java
index 9535bb3..e9f20ff 100644
---
a/server/src/test/java/io/druid/segment/realtime/appenderator/StreamAppenderatorDriverFailTest.java
+++
b/server/src/test/java/io/druid/segment/realtime/appenderator/StreamAppenderatorDriverFailTest.java
@@ -239,8 +239,7 @@ public class StreamAppenderatorDriverFailTest extends
EasyMockSupport
{
expectedException.expect(ExecutionException.class);
expectedException.expectCause(CoreMatchers.instanceOf(ISE.class));
- expectedException.expectMessage(
- "Failed to publish segments[[DataSegment{size=0,
shardSpec=NumberedShardSpec{partitionNum=0, partitions=0}, metrics=[],
dimensions=[], version='abc123', loadSpec={},
interval=2000-01-01T00:00:00.000Z/2000-01-01T01:00:00.000Z, dataSource='foo',
binaryVersion='0'}, DataSegment{size=0,
shardSpec=NumberedShardSpec{partitionNum=0, partitions=0}, metrics=[],
dimensions=[], version='abc123', loadSpec={},
interval=2000-01-01T01:00:00.000Z/2000-01-01T02:00:00.000Z, dataSource='foo',
binar [...]
+ expectedException.expectMessage("Failed to publish segments.");
testFailDuringPublishInternal(false);
}
@@ -279,31 +278,34 @@ public class StreamAppenderatorDriverFailTest extends
EasyMockSupport
Assert.assertTrue(driver.add(ROWS.get(i), "dummy", committerSupplier,
false, true).isOk());
}
- dataSegmentKiller.killQuietly(new DataSegment(
- "foo",
- Intervals.of("2000-01-01T00:00:00.000Z/2000-01-01T01:00:00.000Z"),
- "abc123",
- ImmutableMap.of(),
- ImmutableList.of(),
- ImmutableList.of(),
- new NumberedShardSpec(0, 0),
- 0,
- 0
- ));
- EasyMock.expectLastCall().once();
-
- dataSegmentKiller.killQuietly(new DataSegment(
- "foo",
- Intervals.of("2000-01-01T01:00:00.000Z/2000-01-01T02:00:00.000Z"),
- "abc123",
- ImmutableMap.of(),
- ImmutableList.of(),
- ImmutableList.of(),
- new NumberedShardSpec(0, 0),
- 0,
- 0
- ));
- EasyMock.expectLastCall().once();
+ if (!failWithException) {
+ // Should only kill segments if there was _no_ exception.
+ dataSegmentKiller.killQuietly(new DataSegment(
+ "foo",
+ Intervals.of("2000-01-01T00:00:00.000Z/2000-01-01T01:00:00.000Z"),
+ "abc123",
+ ImmutableMap.of(),
+ ImmutableList.of(),
+ ImmutableList.of(),
+ new NumberedShardSpec(0, 0),
+ 0,
+ 0
+ ));
+ EasyMock.expectLastCall().once();
+
+ dataSegmentKiller.killQuietly(new DataSegment(
+ "foo",
+ Intervals.of("2000-01-01T01:00:00.000Z/2000-01-01T02:00:00.000Z"),
+ "abc123",
+ ImmutableMap.of(),
+ ImmutableList.of(),
+ ImmutableList.of(),
+ new NumberedShardSpec(0, 0),
+ 0,
+ 0
+ ));
+ EasyMock.expectLastCall().once();
+ }
EasyMock.replay(dataSegmentKiller);
diff --git
a/server/src/test/java/io/druid/segment/realtime/appenderator/StreamAppenderatorDriverTest.java
b/server/src/test/java/io/druid/segment/realtime/appenderator/StreamAppenderatorDriverTest.java
index aff1e02..763ed0e 100644
---
a/server/src/test/java/io/druid/segment/realtime/appenderator/StreamAppenderatorDriverTest.java
+++
b/server/src/test/java/io/druid/segment/realtime/appenderator/StreamAppenderatorDriverTest.java
@@ -30,6 +30,7 @@ import com.google.common.util.concurrent.ListenableFuture;
import io.druid.data.input.Committer;
import io.druid.data.input.InputRow;
import io.druid.data.input.MapBasedInputRow;
+import io.druid.indexing.overlord.SegmentPublishResult;
import io.druid.jackson.DefaultObjectMapper;
import io.druid.java.util.common.DateTimes;
import io.druid.java.util.common.Intervals;
@@ -53,6 +54,7 @@ import org.junit.Test;
import java.io.IOException;
import java.util.Arrays;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -359,14 +361,7 @@ public class StreamAppenderatorDriverTest extends
EasyMockSupport
static TransactionalSegmentPublisher makeOkPublisher()
{
- return new TransactionalSegmentPublisher()
- {
- @Override
- public boolean publishSegments(Set<DataSegment> segments, Object
commitMetadata) throws IOException
- {
- return true;
- }
- };
+ return (segments, commitMetadata) -> new
SegmentPublishResult(Collections.emptySet(), true);
}
static TransactionalSegmentPublisher makeFailingPublisher(boolean
failWithException)
@@ -375,7 +370,7 @@ public class StreamAppenderatorDriverTest extends
EasyMockSupport
if (failWithException) {
throw new RuntimeException("test");
}
- return false;
+ return SegmentPublishResult.fail();
};
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]