This is an automated email from the ASF dual-hosted git repository.
lakshsingla pushed a commit to branch 28.0.0
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/28.0.0 by this push:
new 701b9af6c90 Filter pending segments upgraded with transactional
replace (#15169) (#15234)
701b9af6c90 is described below
commit 701b9af6c90e535e320cf4d8b4491d0032209106
Author: AmatyaAvadhanula <[email protected]>
AuthorDate: Tue Oct 24 00:37:18 2023 +0530
Filter pending segments upgraded with transactional replace (#15169)
(#15234)
* Filter pending segments upgraded with transactional replace
* Push sequence name filter to metadata query
---
.../MaterializedViewSupervisor.java | 6 ++
.../MaterializedViewSupervisorSpecTest.java | 16 +---
.../actions/SegmentTransactionalReplaceAction.java | 5 +-
.../overlord/supervisor/SupervisorManager.java | 10 +++
.../supervisor/SeekableStreamSupervisor.java | 22 +++++
.../SeekableStreamSupervisorStateTest.java | 42 ++++++++++
.../TestIndexerMetadataStorageCoordinator.java | 5 +-
.../IndexerMetadataStorageCoordinator.java | 5 +-
.../overlord/supervisor/NoopSupervisorSpec.java | 7 ++
.../indexing/overlord/supervisor/Supervisor.java | 6 ++
.../IndexerSQLMetadataStorageCoordinator.java | 76 ++++++++++++++---
.../IndexerSQLMetadataStorageCoordinatorTest.java | 98 ++++++++++++++++++++++
12 files changed, 273 insertions(+), 25 deletions(-)
diff --git
a/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisor.java
b/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisor.java
index d0a035be17c..af5c0fbe95a 100644
---
a/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisor.java
+++
b/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisor.java
@@ -296,6 +296,12 @@ public class MaterializedViewSupervisor implements
Supervisor
throw new UnsupportedOperationException("Compute Lag Stats not supported
in MaterializedViewSupervisor");
}
+ @Override
+ public Set<String> getActiveRealtimeSequencePrefixes()
+ {
+ throw new UnsupportedOperationException();
+ }
+
@Override
public int getActiveTaskGroupsCount()
{
diff --git
a/extensions-contrib/materialized-view-maintenance/src/test/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorSpecTest.java
b/extensions-contrib/materialized-view-maintenance/src/test/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorSpecTest.java
index fe84d358baf..365fb1751ea 100644
---
a/extensions-contrib/materialized-view-maintenance/src/test/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorSpecTest.java
+++
b/extensions-contrib/materialized-view-maintenance/src/test/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorSpecTest.java
@@ -203,19 +203,11 @@ public class MaterializedViewSupervisorSpecTest
SupervisorTaskAutoScaler autoscaler = spec.createAutoscaler(supervisor);
Assert.assertNull(autoscaler);
- try {
- supervisor.computeLagStats();
- }
- catch (Exception e) {
- Assert.assertTrue(e instanceof UnsupportedOperationException);
- }
+ Assert.assertThrows(UnsupportedOperationException.class, () ->
supervisor.computeLagStats());
- try {
- int count = supervisor.getActiveTaskGroupsCount();
- }
- catch (Exception e) {
- Assert.assertTrue(e instanceof UnsupportedOperationException);
- }
+ Assert.assertThrows(UnsupportedOperationException.class, () ->
supervisor.getActiveTaskGroupsCount());
+
+ Assert.assertThrows(UnsupportedOperationException.class, () ->
supervisor.getActiveRealtimeSequencePrefixes());
Callable<Integer> noop = new Callable<Integer>() {
@Override
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalReplaceAction.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalReplaceAction.java
index e6ad0426e46..aaa62db90a7 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalReplaceAction.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalReplaceAction.java
@@ -145,8 +145,11 @@ public class SegmentTransactionalReplaceAction implements
TaskAction<SegmentPubl
return;
}
+ final Set<String> activeRealtimeSequencePrefixes
+ =
supervisorManager.getActiveRealtimeSequencePrefixes(activeSupervisorIdWithAppendLock.get());
Map<SegmentIdWithShardSpec, SegmentIdWithShardSpec>
upgradedPendingSegments =
-
toolbox.getIndexerMetadataStorageCoordinator().upgradePendingSegmentsOverlappingWith(segments);
+ toolbox.getIndexerMetadataStorageCoordinator()
+ .upgradePendingSegmentsOverlappingWith(segments,
activeRealtimeSequencePrefixes);
log.info(
"Upgraded [%d] pending segments for REPLACE task[%s]: [%s]",
upgradedPendingSegments.size(), task.getId(), upgradedPendingSegments
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java
index df454c1011a..207ff56f28f 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java
@@ -39,6 +39,7 @@ import
org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;
import javax.annotation.Nullable;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -110,6 +111,15 @@ public class SupervisorManager
return Optional.absent();
}
+ public Set<String> getActiveRealtimeSequencePrefixes(String
activeSupervisorId)
+ {
+ if (supervisors.containsKey(activeSupervisorId)) {
+ return
supervisors.get(activeSupervisorId).lhs.getActiveRealtimeSequencePrefixes();
+ } else {
+ return Collections.emptySet();
+ }
+ }
+
public Optional<SupervisorSpec> getSupervisorSpec(String id)
{
Pair<Supervisor, SupervisorSpec> supervisor = supervisors.get(id);
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
index 1d05169e3fb..a0ec6d809eb 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
@@ -1093,6 +1093,28 @@ public abstract class
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
addNotice(new ResetOffsetsNotice(resetDataSourceMetadata));
}
+ /**
+ * The base sequence name of a seekable stream task group is used as a
prefix of the sequence names
+ * of pending segments published by it.
+ * This method can be used to identify the active pending segments for a
datasource
+ * by checking if the sequence name begins with any of the active realtime
sequence prefix returned by this method
+ * @return the set of base sequence names of both active and pending
completion task gruops.
+ */
+ @Override
+ public Set<String> getActiveRealtimeSequencePrefixes()
+ {
+ final Set<String> activeBaseSequences = new HashSet<>();
+ for (TaskGroup taskGroup : activelyReadingTaskGroups.values()) {
+ activeBaseSequences.add(taskGroup.baseSequenceName);
+ }
+ for (List<TaskGroup> taskGroupList : pendingCompletionTaskGroups.values())
{
+ for (TaskGroup taskGroup : taskGroupList) {
+ activeBaseSequences.add(taskGroup.baseSequenceName);
+ }
+ }
+ return activeBaseSequences;
+ }
+
public void registerNewVersionOfPendingSegment(
SegmentIdWithShardSpec basePendingSegment,
SegmentIdWithShardSpec newSegmentVersion
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
index 819a6baacd8..7a587bb196e 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
@@ -1313,6 +1313,48 @@ public class SeekableStreamSupervisorStateTest extends
EasyMockSupport
validateSupervisorStateAfterResetOffsets(supervisor, resetOffsets, 0);
}
+ @Test
+ public void testGetActiveRealtimeSequencePrefixes()
+ {
+ EasyMock.expect(spec.isSuspended()).andReturn(false);
+
+ replayAll();
+
+ final TestSeekableStreamSupervisor supervisor = new
TestSeekableStreamSupervisor();
+
+ // Spin off two active tasks with each task serving one partition.
+ supervisor.getIoConfig().setTaskCount(3);
+ supervisor.start();
+ supervisor.addTaskGroupToActivelyReadingTaskGroup(
+ supervisor.getTaskGroupIdForPartition("0"),
+ ImmutableMap.of("0", "5"),
+ Optional.absent(),
+ Optional.absent(),
+ ImmutableSet.of("task1"),
+ ImmutableSet.of()
+ );
+
+ supervisor.addTaskGroupToActivelyReadingTaskGroup(
+ supervisor.getTaskGroupIdForPartition("1"),
+ ImmutableMap.of("1", "6"),
+ Optional.absent(),
+ Optional.absent(),
+ ImmutableSet.of("task2"),
+ ImmutableSet.of()
+ );
+
+ supervisor.addTaskGroupToPendingCompletionTaskGroup(
+ supervisor.getTaskGroupIdForPartition("2"),
+ ImmutableMap.of("2", "100"),
+ Optional.absent(),
+ Optional.absent(),
+ ImmutableSet.of("task3"),
+ ImmutableSet.of()
+ );
+
+ Assert.assertEquals(3,
supervisor.getActiveRealtimeSequencePrefixes().size());
+ }
+
@Test
public void testSupervisorResetSpecificOffsetsTasksWithCheckpoints() throws
InterruptedException, IOException
{
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java
b/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java
index 108833422c8..143a74c72cb 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java
@@ -238,7 +238,10 @@ public class TestIndexerMetadataStorageCoordinator
implements IndexerMetadataSto
}
@Override
- public Map<SegmentIdWithShardSpec, SegmentIdWithShardSpec>
upgradePendingSegmentsOverlappingWith(Set<DataSegment> replaceSegments)
+ public Map<SegmentIdWithShardSpec, SegmentIdWithShardSpec>
upgradePendingSegmentsOverlappingWith(
+ Set<DataSegment> replaceSegments,
+ Set<String> activeBaseSequenceNames
+ )
{
return Collections.emptyMap();
}
diff --git
a/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java
b/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java
index 7c6710048a1..34a55574dce 100644
---
a/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java
+++
b/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java
@@ -347,10 +347,13 @@ public interface IndexerMetadataStorageCoordinator
* </ul>
*
* @param replaceSegments Segments being committed by a REPLACE task
+ * @param activeRealtimeSequencePrefixes Set of sequence prefixes of active
and pending completion task groups
+ * of the supervisor (if any) for this
datasource
* @return Map from originally allocated pending segment to its new upgraded
ID.
*/
Map<SegmentIdWithShardSpec, SegmentIdWithShardSpec>
upgradePendingSegmentsOverlappingWith(
- Set<DataSegment> replaceSegments
+ Set<DataSegment> replaceSegments,
+ Set<String> activeRealtimeSequencePrefixes
);
/**
diff --git
a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/NoopSupervisorSpec.java
b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/NoopSupervisorSpec.java
index e733ef6c233..20c10253386 100644
---
a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/NoopSupervisorSpec.java
+++
b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/NoopSupervisorSpec.java
@@ -31,6 +31,7 @@ import org.apache.druid.server.security.ResourceAction;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Set;
@@ -185,6 +186,12 @@ public class NoopSupervisorSpec implements SupervisorSpec
{
return -1;
}
+
+ @Override
+ public Set<String> getActiveRealtimeSequencePrefixes()
+ {
+ return Collections.emptySet();
+ }
};
}
diff --git
a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/Supervisor.java
b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/Supervisor.java
index bcfc5ebe819..9d940bc55b6 100644
---
a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/Supervisor.java
+++
b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/Supervisor.java
@@ -29,6 +29,7 @@ import
org.apache.druid.segment.incremental.ParseExceptionReport;
import javax.annotation.Nullable;
import java.util.List;
import java.util.Map;
+import java.util.Set;
public interface Supervisor
{
@@ -93,4 +94,9 @@ public interface Supervisor
LagStats computeLagStats();
int getActiveTaskGroupsCount();
+
+ /**
+ * @return active sequence prefixes for reading and pending completion task
groups of a seekable stream supervisor
+ */
+ Set<String> getActiveRealtimeSequencePrefixes();
}
diff --git
a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
index c654d5e229b..612f712c1bb 100644
---
a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
+++
b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
@@ -231,7 +231,7 @@ public class IndexerSQLMetadataStorageCoordinator
implements IndexerMetadataStor
(handle, status) -> {
try (final CloseableIterator<DataSegment> iterator =
SqlSegmentsMetadataQuery.forHandle(handle, connector,
dbTables, jsonMapper)
- .retrieveUnusedSegments(dataSource,
Collections.singletonList(interval), limit)) {
+ .retrieveUnusedSegments(dataSource,
Collections.singletonList(interval), limit)) {
return ImmutableList.copyOf(iterator);
}
}
@@ -258,9 +258,62 @@ public class IndexerSQLMetadataStorageCoordinator
implements IndexerMetadataStor
/**
* Fetches all the pending segments, whose interval overlaps with the given
- * search interval from the metadata store. Returns a Map from the
- * pending segment ID to the sequence name.
+ * search interval and has a sequence_name that begins with one of the
prefixes in sequenceNamePrefixFilter
+ * from the metadata store. Returns a Map from the pending segment ID to the
sequence name.
*/
+ @VisibleForTesting
+ Map<SegmentIdWithShardSpec, String> getPendingSegmentsForIntervalWithHandle(
+ final Handle handle,
+ final String dataSource,
+ final Interval interval,
+ final Set<String> sequenceNamePrefixFilter
+ ) throws IOException
+ {
+ if (sequenceNamePrefixFilter.isEmpty()) {
+ return Collections.emptyMap();
+ }
+
+ final List<String> sequenceNamePrefixes = new
ArrayList<>(sequenceNamePrefixFilter);
+ final List<String> sequenceNamePrefixConditions = new ArrayList<>();
+ for (int i = 0; i < sequenceNamePrefixes.size(); i++) {
+ sequenceNamePrefixConditions.add(StringUtils.format("(sequence_name LIKE
:prefix%d)", i));
+ }
+
+ String sql = "SELECT sequence_name, payload"
+ + " FROM " + dbTables.getPendingSegmentsTable()
+ + " WHERE dataSource = :dataSource"
+ + " AND start < :end"
+ + StringUtils.format(" AND %1$send%1$s > :start",
connector.getQuoteString())
+ + " AND ( " + String.join(" OR ",
sequenceNamePrefixConditions) + " )";
+
+ Query<Map<String, Object>> query = handle.createQuery(sql)
+ .bind("dataSource", dataSource)
+ .bind("start",
interval.getStart().toString())
+ .bind("end",
interval.getEnd().toString());
+
+ for (int i = 0; i < sequenceNamePrefixes.size(); i++) {
+ query.bind(StringUtils.format("prefix%d", i),
sequenceNamePrefixes.get(i) + "%");
+ }
+
+ final ResultIterator<PendingSegmentsRecord> dbSegments =
+ query.map((index, r, ctx) -> PendingSegmentsRecord.fromResultSet(r))
+ .iterator();
+
+ final Map<SegmentIdWithShardSpec, String> pendingSegmentToSequenceName =
new HashMap<>();
+ while (dbSegments.hasNext()) {
+ PendingSegmentsRecord record = dbSegments.next();
+ final SegmentIdWithShardSpec identifier =
jsonMapper.readValue(record.payload, SegmentIdWithShardSpec.class);
+
+ if (interval.overlaps(identifier.getInterval())) {
+ pendingSegmentToSequenceName.put(identifier, record.sequenceName);
+ }
+ }
+
+ dbSegments.close();
+
+ return pendingSegmentToSequenceName;
+ }
+
private Map<SegmentIdWithShardSpec, String>
getPendingSegmentsForIntervalWithHandle(
final Handle handle,
final String dataSource,
@@ -620,7 +673,8 @@ public class IndexerSQLMetadataStorageCoordinator
implements IndexerMetadataStor
@Override
public Map<SegmentIdWithShardSpec, SegmentIdWithShardSpec>
upgradePendingSegmentsOverlappingWith(
- Set<DataSegment> replaceSegments
+ Set<DataSegment> replaceSegments,
+ Set<String> activeRealtimeSequencePrefixes
)
{
if (replaceSegments.isEmpty()) {
@@ -639,7 +693,7 @@ public class IndexerSQLMetadataStorageCoordinator
implements IndexerMetadataStor
final String datasource =
replaceSegments.iterator().next().getDataSource();
return connector.retryWithHandle(
- handle -> upgradePendingSegments(handle, datasource,
replaceIntervalToMaxId)
+ handle -> upgradePendingSegments(handle, datasource,
replaceIntervalToMaxId, activeRealtimeSequencePrefixes)
);
}
@@ -658,7 +712,8 @@ public class IndexerSQLMetadataStorageCoordinator
implements IndexerMetadataStor
private Map<SegmentIdWithShardSpec, SegmentIdWithShardSpec>
upgradePendingSegments(
Handle handle,
String datasource,
- Map<Interval, DataSegment> replaceIntervalToMaxId
+ Map<Interval, DataSegment> replaceIntervalToMaxId,
+ Set<String> activeRealtimeSequencePrefixes
) throws IOException
{
final Map<SegmentCreateRequest, SegmentIdWithShardSpec>
newPendingSegmentVersions = new HashMap<>();
@@ -673,12 +728,13 @@ public class IndexerSQLMetadataStorageCoordinator
implements IndexerMetadataStor
int currentPartitionNumber =
maxSegmentId.getShardSpec().getPartitionNum();
final Map<SegmentIdWithShardSpec, String> overlappingPendingSegments
- = getPendingSegmentsForIntervalWithHandle(handle, datasource,
replaceInterval);
+ = getPendingSegmentsForIntervalWithHandle(handle, datasource,
replaceInterval, activeRealtimeSequencePrefixes);
for (Map.Entry<SegmentIdWithShardSpec, String> overlappingPendingSegment
: overlappingPendingSegments.entrySet()) {
final SegmentIdWithShardSpec pendingSegmentId =
overlappingPendingSegment.getKey();
final String pendingSegmentSequence =
overlappingPendingSegment.getValue();
+
if (shouldUpgradePendingSegment(pendingSegmentId,
pendingSegmentSequence, replaceInterval, replaceVersion)) {
// Ensure unique sequence_name_prev_id_sha1 by setting
// sequence_prev_id -> pendingSegmentId
@@ -1281,9 +1337,9 @@ public class IndexerSQLMetadataStorageCoordinator
implements IndexerMetadataStor
final Map<Interval, Set<DataSegment>> overlappingIntervalToSegments = new
HashMap<>();
for (DataSegment segment : overlappingSegments) {
overlappingVersionToIntervals.computeIfAbsent(segment.getVersion(), v ->
new HashSet<>())
- .add(segment.getInterval());
+ .add(segment.getInterval());
overlappingIntervalToSegments.computeIfAbsent(segment.getInterval(), i
-> new HashSet<>())
- .add(segment);
+ .add(segment);
}
final Set<DataSegment> upgradedSegments = new HashSet<>();
@@ -2275,7 +2331,7 @@ public class IndexerSQLMetadataStorageCoordinator
implements IndexerMetadataStor
// Not in the desired start state.
return new DataStoreMetadataUpdateResult(true, false, StringUtils.format(
"Inconsistent metadata state. This can happen if you update input
topic in a spec without changing " +
- "the supervisor name. Stored state: [%s], Target state: [%s].",
+ "the supervisor name. Stored state: [%s], Target state: [%s].",
oldCommitMetadataFromDb,
startMetadata
));
diff --git
a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java
b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java
index 0512357ffc1..a8fc9e923c5 100644
---
a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java
+++
b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java
@@ -24,6 +24,8 @@ import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
+import com.google.common.hash.Hashing;
+import com.google.common.io.BaseEncoding;
import org.apache.druid.data.input.StringTuple;
import org.apache.druid.indexing.overlord.DataSourceMetadata;
import org.apache.druid.indexing.overlord.ObjectMetadata;
@@ -464,6 +466,44 @@ public class IndexerSQLMetadataStorageCoordinatorTest
);
}
+ private Boolean
insertPendingSegmentAndSequenceName(Pair<SegmentIdWithShardSpec, String>
pendingSegmentSequenceName)
+ {
+ final SegmentIdWithShardSpec pendingSegment =
pendingSegmentSequenceName.lhs;
+ final String sequenceName = pendingSegmentSequenceName.rhs;
+ final String table =
derbyConnectorRule.metadataTablesConfigSupplier().get().getPendingSegmentsTable();
+ return derbyConnector.retryWithHandle(
+ handle -> {
+ handle.createStatement(
+ StringUtils.format(
+ "INSERT INTO %1$s (id, dataSource, created_date,
start, %2$send%2$s, sequence_name, sequence_prev_id, "
+ + "sequence_name_prev_id_sha1, payload) "
+ + "VALUES (:id, :dataSource, :created_date, :start,
:end, :sequence_name, :sequence_prev_id, "
+ + ":sequence_name_prev_id_sha1, :payload)",
+ table,
+ derbyConnector.getQuoteString()
+ )
+ )
+ .bind("id", pendingSegment.toString())
+ .bind("dataSource", pendingSegment.getDataSource())
+ .bind("created_date", DateTimes.nowUtc().toString())
+ .bind("start",
pendingSegment.getInterval().getStart().toString())
+ .bind("end", pendingSegment.getInterval().getEnd().toString())
+ .bind("sequence_name", sequenceName)
+ .bind("sequence_prev_id", pendingSegment.toString())
+ .bind("sequence_name_prev_id_sha1",
BaseEncoding.base16().encode(
+ Hashing.sha1()
+ .newHasher()
+ .putLong((long) pendingSegment.hashCode() *
sequenceName.hashCode())
+ .hash()
+ .asBytes()
+ ))
+ .bind("payload", mapper.writeValueAsBytes(pendingSegment))
+ .execute();
+ return true;
+ }
+ );
+ }
+
private Map<String, String> getSegmentsCommittedDuringReplaceTask(String
taskId)
{
final String table =
derbyConnectorRule.metadataTablesConfigSupplier().get().getUpgradeSegmentsTable();
@@ -2554,6 +2594,64 @@ public class IndexerSQLMetadataStorageCoordinatorTest
);
}
+ @Test
+ public void testGetPendingSegmentsForIntervalWithSequencePrefixes()
+ {
+ Pair<SegmentIdWithShardSpec, String> validIntervalValidSequence = Pair.of(
+ SegmentIdWithShardSpec.fromDataSegment(defaultSegment),
+ "validLOL"
+ );
+ insertPendingSegmentAndSequenceName(validIntervalValidSequence);
+
+ Pair<SegmentIdWithShardSpec, String> validIntervalInvalidSequence =
Pair.of(
+ SegmentIdWithShardSpec.fromDataSegment(defaultSegment2),
+ "invalidRandom"
+ );
+ insertPendingSegmentAndSequenceName(validIntervalInvalidSequence);
+
+ Pair<SegmentIdWithShardSpec, String> invalidIntervalvalidSequence =
Pair.of(
+ SegmentIdWithShardSpec.fromDataSegment(existingSegment1),
+ "validStuff"
+ );
+ insertPendingSegmentAndSequenceName(invalidIntervalvalidSequence);
+
+ Pair<SegmentIdWithShardSpec, String> twentyFifteenWithAnotherValidSequence
= Pair.of(
+ new SegmentIdWithShardSpec(
+ existingSegment1.getDataSource(),
+ Intervals.of("2015/2016"),
+ "1970-01-01",
+ new NumberedShardSpec(1, 0)
+ ),
+ "alsoValidAgain"
+ );
+ insertPendingSegmentAndSequenceName(twentyFifteenWithAnotherValidSequence);
+
+ Pair<SegmentIdWithShardSpec, String> twentyFifteenWithInvalidSequence =
Pair.of(
+ new SegmentIdWithShardSpec(
+ existingSegment1.getDataSource(),
+ Intervals.of("2015/2016"),
+ "1970-01-01",
+ new NumberedShardSpec(2, 0)
+ ),
+ "definitelyInvalid"
+ );
+ insertPendingSegmentAndSequenceName(twentyFifteenWithInvalidSequence);
+
+
+ final Map<SegmentIdWithShardSpec, String> expected = new HashMap<>();
+ expected.put(validIntervalValidSequence.lhs,
validIntervalValidSequence.rhs);
+ expected.put(twentyFifteenWithAnotherValidSequence.lhs,
twentyFifteenWithAnotherValidSequence.rhs);
+
+ final Map<SegmentIdWithShardSpec, String> actual =
+ derbyConnector.retryWithHandle(handle ->
coordinator.getPendingSegmentsForIntervalWithHandle(
+ handle,
+ defaultSegment.getDataSource(),
+ defaultSegment.getInterval(),
+ ImmutableSet.of("valid", "alsoValid")
+ ));
+ Assert.assertEquals(expected, actual);
+ }
+
@Test
public void testRetrieveUsedSegmentsAndCreatedDates()
{
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]