kfaraz commented on a change in pull request #12235:
URL: https://github.com/apache/druid/pull/12235#discussion_r806501122
##########
File path:
extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplier.java
##########
@@ -750,6 +752,25 @@ public boolean isAnyFetchActive()
.anyMatch(fetch -> (fetch != null &&
!fetch.isDone()));
}
+ /**
+ * Is costly and requires polling the shard to determine if it's empty
+ * @param stream to which shard belongs
+ * @param shardId of the closed shard
+ * @return if the closed shard is empty
Review comment:
Nit:
```suggestion
* @return true if the closed shard is empty, false otherwise
```
##########
File path:
extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java
##########
@@ -481,4 +535,47 @@ protected boolean supportsPartitionExpiration()
return new KinesisDataSourceMetadata(newSequences);
}
+
+ /**
+ * A shard is considered closed iff it has an ending sequence number.
+ *
+ * @param shard to be checked
+ * @return if shard is closed
+ */
+ private boolean isShardClosed(Shard shard)
+ {
+ return shard.getSequenceNumberRange().getEndingSequenceNumber() != null;
+ }
+
+ /**
+ * Checking if a shard is empty requires polling for records which is quite
expensive
+ * Fortunately, the results can be cached for closed shards as no more
records can be written to them
+ * Please use this method only if the info is absent from the cache
+ *
+ * @param stream to which the shard belongs
+ * @param shardId of the shard
+ * @return if the shard is empty
+ */
+ private boolean isClosedShardEmpty(String stream, String shardId)
+ {
+ return ((KinesisRecordSupplier) recordSupplier).isClosedShardEmpty(stream,
shardId);
Review comment:
Nit: Since this is a single statement method and is used only in one
place, we could move this to the calling method itself. Might help readability.
##########
File path:
extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplier.java
##########
@@ -667,28 +666,31 @@ public String
getEarliestSequenceNumber(StreamPartition<String> partition)
* This makes the method resilient to LimitExceeded exceptions (compared to
100 shards, 10 TPS of describeStream)
*
* @param stream name of stream
- *
- * @return Set of Shard ids
+ * @return Immutable set of shards
*/
+ public Set<Shard> getShards(String stream)
+ {
+ ImmutableSet.Builder<Shard> shards = ImmutableSet.builder();
+ ListShardsRequest request = new ListShardsRequest().withStreamName(stream);
+ while (true) {
+ ListShardsResult result = kinesis.listShards(request);
+ shards.addAll(result.getShards());
+ String nextToken = result.getNextToken();
+ if (nextToken == null) {
+ return shards.build();
+ }
+ request = new ListShardsRequest().withNextToken(nextToken);
+ }
+ }
+
@Override
public Set<String> getPartitionIds(String stream)
{
return wrapExceptions(() -> {
- final Set<String> retVal = new TreeSet<>();
- ListShardsRequest request = new
ListShardsRequest().withStreamName(stream);
- while (true) {
- ListShardsResult result = kinesis.listShards(request);
- retVal.addAll(result.getShards()
- .stream()
- .map(Shard::getShardId)
- .collect(Collectors.toList())
- );
- String nextToken = result.getNextToken();
- if (nextToken == null) {
- return retVal;
- }
- request = new ListShardsRequest().withNextToken(nextToken);
- }
+ return ImmutableSet.copyOf(getShards(stream).stream()
+ .map(shard ->
shard.getShardId())
+ .collect(Collectors.toList())
Review comment:
Nit: Can we just collect to an immutable Set instead of collecting to a
List first and then converting to a Set?
##########
File path:
extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplierTest.java
##########
@@ -1023,4 +1024,61 @@ public void getPartitionTimeLag() throws
InterruptedException
}
verifyAll();
}
+
+ @Test
+ public void testIsClosedShardEmpty()
+ {
+ AmazonKinesis mockKinesis = EasyMock.mock(AmazonKinesis.class);
+ KinesisRecordSupplier target = new KinesisRecordSupplier(mockKinesis,
+ recordsPerFetch,
+ 0,
+ 2,
+ false,
+ 100,
+ 5000,
+ 5000,
+ 60000,
+ 5,
+ true
+ );
+ Record record = new Record();
+ String shardId;
+
+ // No records and null iterator -> empty
+ shardId = "0";
+ isClosedShardEmptyHelper(mockKinesis, shardId, new ArrayList<>(), null);
+ Assert.assertTrue(target.isClosedShardEmpty(STREAM, shardId));
+
+ // no records and non-null iterator -> non-empty
+ shardId = "1";
+ isClosedShardEmptyHelper(mockKinesis, shardId, new ArrayList<>(),
"nextIterator");
+ Assert.assertFalse(target.isClosedShardEmpty(STREAM, shardId));
+
+ // non-empty records and null iterator -> non-empty
+ shardId = "2";
+ isClosedShardEmptyHelper(mockKinesis, shardId,
Collections.singletonList(record), null);
+ Assert.assertFalse(target.isClosedShardEmpty(STREAM, shardId));
+
+ // non-empty records and non-null iterator -> non-empty
+ shardId = "3";
Review comment:
It doesn't seem like the `shardId` matters in these tests, because we
are doing an `EasyMock.reset(kinesis)` anyway. If not required, just use a
final `shardId`.
Option 2 (preferred): Use different `shardIds` but do not call reset on the
mock.
##########
File path:
extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplier.java
##########
@@ -750,6 +752,25 @@ public boolean isAnyFetchActive()
.anyMatch(fetch -> (fetch != null &&
!fetch.isDone()));
}
+ /**
+ * Is costly and requires polling the shard to determine if it's empty
Review comment:
Nit:
I don't think we are really polling anything here.
```suggestion
* Fetches records from the specified shard to determine if it is empty.
```
##########
File path:
extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java
##########
@@ -339,9 +342,9 @@ public void testNoInitialStateWithAutoScaleOut() throws
Exception
EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes();
EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of()).anyTimes();
EasyMock.expect(indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(DATASOURCE)).andReturn(
- new KinesisDataSourceMetadata(
- null
- )
+ new KinesisDataSourceMetadata(
Review comment:
Nit: If you are reformatting this, maybe just put the whole constructor
on a single line.
##########
File path:
extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java
##########
@@ -1294,7 +1297,9 @@ public void testRequeueAdoptedTaskWhenFailed() throws
Exception
.times(1);
-
EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of(captured.getValue())).anyTimes();
+ EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE))
Review comment:
Nit: Please try to avoid formatting changes unless they are relevant to
the core set of changes.
##########
File path:
extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java
##########
@@ -4885,6 +4907,135 @@ private void testShardMergePhaseThree(List<Task>
phaseTwoTasks) throws Exception
Assert.assertEquals(expectedPartitionOffsets,
supervisor.getPartitionOffsets());
}
+ @Test
+ public void testUpdateClosedShardCache()
+ {
+ supervisor = getTestableSupervisor(1, 2, true, "PT1H", null, null);
+ supervisor.setupRecordSupplier();
+ supervisor.tryInit();
+ String stream = supervisor.getKinesisSupervisorSpec().getSource();
+ Shard openShard = EasyMock.mock(Shard.class);
+ Shard emptyClosedShard = EasyMock.mock(Shard.class);
+ Shard nonEmptyClosedShard = EasyMock.mock(Shard.class);
+ Set<Shard> activeShards;
+ Set<String> emptyClosedShardIds;
+ Set<String> nonEmptyClosedShardIds;
+
+ // ITERATION 0:
+ // active shards: an open shard, closed-empty shard and closed-nonEmpty
shard
+ activeShards = getActiveShards(openShard, true,
+ emptyClosedShard, true,
+ nonEmptyClosedShard, true);
+
+ EasyMock.reset(supervisorRecordSupplier);
Review comment:
Rather than resetting the mock objects every time, you could do the
following:
1. have some open shards, closed empty and closed non-empty shards (and
maybe open empty too, just to be sure)
2. setup the record supplier to return all of these when calling `getShards`
3. setup the record supplier to return the correct values for
`isClosedShardEmpty` for each shard
4. call `supervisor.getIgnorablePartitionIds()` and verify the returned
values
5. in step 4, verify the shard ids on which the mock record supplier calls
`isClosedShardEmpty`
(optional step 5a: move some shards from open to closed, empty to non-empty,
add new shards, etc.)
6. call `supervisor.getIgnorablePartitionIds()` again and verify the
returned values
7. In step 6, verify the shard ids on which the mock record supplier calls
`isClosedShardEmpty`
I feel this would be easier to follow and more representative of what would
happen in a real scenario. It would also help you avoid resetting the mock
objects and avoid exposing the internal caches which are currently only visible
for testing.
##########
File path:
extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplierTest.java
##########
@@ -1023,4 +1024,61 @@ public void getPartitionTimeLag() throws
InterruptedException
}
verifyAll();
}
+
+ @Test
+ public void testIsClosedShardEmpty()
+ {
+ AmazonKinesis mockKinesis = EasyMock.mock(AmazonKinesis.class);
+ KinesisRecordSupplier target = new KinesisRecordSupplier(mockKinesis,
+ recordsPerFetch,
+ 0,
+ 2,
+ false,
+ 100,
+ 5000,
+ 5000,
+ 60000,
+ 5,
+ true
+ );
+ Record record = new Record();
+ String shardId;
+
+ // No records and null iterator -> empty
+ shardId = "0";
+ isClosedShardEmptyHelper(mockKinesis, shardId, new ArrayList<>(), null);
+ Assert.assertTrue(target.isClosedShardEmpty(STREAM, shardId));
+
+ // no records and non-null iterator -> non-empty
+ shardId = "1";
+ isClosedShardEmptyHelper(mockKinesis, shardId, new ArrayList<>(),
"nextIterator");
+ Assert.assertFalse(target.isClosedShardEmpty(STREAM, shardId));
+
+ // non-empty records and null iterator -> non-empty
+ shardId = "2";
+ isClosedShardEmptyHelper(mockKinesis, shardId,
Collections.singletonList(record), null);
+ Assert.assertFalse(target.isClosedShardEmpty(STREAM, shardId));
+
+ // non-empty records and non-null iterator -> non-empty
+ shardId = "3";
+ isClosedShardEmptyHelper(mockKinesis, shardId,
Collections.singletonList(record), "nextIterator");
+ Assert.assertFalse(target.isClosedShardEmpty(STREAM, shardId));
+ }
+
+ private void isClosedShardEmptyHelper(AmazonKinesis kinesis, String shardId,
Review comment:
Rename this to something more descriptive of the actual contents of this
method.
e.g. `setupMockKinesisForShardId` or something.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]