kfaraz commented on a change in pull request #12235:
URL: https://github.com/apache/druid/pull/12235#discussion_r806501122



##########
File path: 
extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplier.java
##########
@@ -750,6 +752,25 @@ public boolean isAnyFetchActive()
                              .anyMatch(fetch -> (fetch != null && 
!fetch.isDone()));
   }
 
+  /**
+   * Is costly and requires polling the shard to determine if it's empty
+   * @param stream to which shard belongs
+   * @param shardId of the closed shard
+   * @return if the closed shard is empty

Review comment:
       Nit:
   ```suggestion
      * @return true if the closed shard is empty, false otherwise
   ```

##########
File path: 
extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java
##########
@@ -481,4 +535,47 @@ protected boolean supportsPartitionExpiration()
 
     return new KinesisDataSourceMetadata(newSequences);
   }
+
+  /**
+   * A shard is considered closed iff it has an ending sequence number.
+   *
+   * @param shard to be checked
+   * @return if shard is closed
+   */
+  private boolean isShardClosed(Shard shard)
+  {
+    return shard.getSequenceNumberRange().getEndingSequenceNumber() != null;
+  }
+
+  /**
+   * Checking if a shard is empty requires polling for records which is quite 
expensive
+   * Fortunately, the results can be cached for closed shards as no more 
records can be written to them
+   * Please use this method only if the info is absent from the cache
+   *
+   * @param stream to which the shard belongs
+   * @param shardId of the shard
+   * @return if the shard is empty
+   */
+  private boolean isClosedShardEmpty(String stream, String shardId)
+  {
+    return ((KinesisRecordSupplier) recordSupplier).isClosedShardEmpty(stream, 
shardId);

Review comment:
       Nit: Since this is a single statement method and is used only in one 
place, we could move this to the calling method itself. Might help readability.

##########
File path: 
extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplier.java
##########
@@ -667,28 +666,31 @@ public String 
getEarliestSequenceNumber(StreamPartition<String> partition)
    * This makes the method resilient to LimitExceeded exceptions (compared to 
100 shards, 10 TPS of describeStream)
    *
    * @param stream name of stream
-   *
-   * @return Set of Shard ids
+   * @return Immutable set of shards
    */
+  public Set<Shard> getShards(String stream)
+  {
+    ImmutableSet.Builder<Shard> shards = ImmutableSet.builder();
+    ListShardsRequest request = new ListShardsRequest().withStreamName(stream);
+    while (true) {
+      ListShardsResult result = kinesis.listShards(request);
+      shards.addAll(result.getShards());
+      String nextToken = result.getNextToken();
+      if (nextToken == null) {
+        return shards.build();
+      }
+      request = new ListShardsRequest().withNextToken(nextToken);
+    }
+  }
+
   @Override
   public Set<String> getPartitionIds(String stream)
   {
     return wrapExceptions(() -> {
-      final Set<String> retVal = new TreeSet<>();
-      ListShardsRequest request = new 
ListShardsRequest().withStreamName(stream);
-      while (true) {
-        ListShardsResult result = kinesis.listShards(request);
-        retVal.addAll(result.getShards()
-                            .stream()
-                            .map(Shard::getShardId)
-                            .collect(Collectors.toList())
-        );
-        String nextToken = result.getNextToken();
-        if (nextToken == null) {
-          return retVal;
-        }
-        request = new ListShardsRequest().withNextToken(nextToken);
-      }
+      return ImmutableSet.copyOf(getShards(stream).stream()
+                                                  .map(shard -> 
shard.getShardId())
+                                                  .collect(Collectors.toList())

Review comment:
       Nit: Can we just collect to an immutable Set instead of collecting to a 
List first and then converting to a Set?

##########
File path: 
extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplierTest.java
##########
@@ -1023,4 +1024,61 @@ public void getPartitionTimeLag() throws 
InterruptedException
     }
     verifyAll();
   }
+
+  @Test
+  public void testIsClosedShardEmpty()
+  {
+    AmazonKinesis mockKinesis = EasyMock.mock(AmazonKinesis.class);
+    KinesisRecordSupplier target = new KinesisRecordSupplier(mockKinesis,
+                                                             recordsPerFetch,
+                                                             0,
+                                                             2,
+                                                             false,
+                                                             100,
+                                                             5000,
+                                                             5000,
+                                                             60000,
+                                                             5,
+                                                             true
+    );
+    Record record = new Record();
+    String shardId;
+
+    // No records and null iterator -> empty
+    shardId = "0";
+    isClosedShardEmptyHelper(mockKinesis, shardId, new ArrayList<>(), null);
+    Assert.assertTrue(target.isClosedShardEmpty(STREAM, shardId));
+
+    // no records and non-null iterator -> non-empty
+    shardId = "1";
+    isClosedShardEmptyHelper(mockKinesis, shardId, new ArrayList<>(), 
"nextIterator");
+    Assert.assertFalse(target.isClosedShardEmpty(STREAM, shardId));
+
+    // non-empty records and null iterator -> non-empty
+    shardId = "2";
+    isClosedShardEmptyHelper(mockKinesis, shardId, 
Collections.singletonList(record), null);
+    Assert.assertFalse(target.isClosedShardEmpty(STREAM, shardId));
+
+    // non-empty records and non-null iterator -> non-empty
+    shardId = "3";

Review comment:
       It doesn't seem like the `shardId` matters in these tests, because we 
are doing an `EasyMock.reset(kinesis)` anyway. If not required, just use a 
final `shardId`.
   
   Option 2 (preferred): Use different `shardIds` but do not call reset on the 
mock.

##########
File path: 
extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplier.java
##########
@@ -750,6 +752,25 @@ public boolean isAnyFetchActive()
                              .anyMatch(fetch -> (fetch != null && 
!fetch.isDone()));
   }
 
+  /**
+   * Is costly and requires polling the shard to determine if it's empty

Review comment:
       Nit:
   I don't think we are really polling anything here.
   
   ```suggestion
      * Fetches records from the specified shard to determine if it is empty.
   ```

##########
File path: 
extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java
##########
@@ -339,9 +342,9 @@ public void testNoInitialStateWithAutoScaleOut() throws 
Exception
     
EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes();
     
EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of()).anyTimes();
     
EasyMock.expect(indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(DATASOURCE)).andReturn(
-            new KinesisDataSourceMetadata(
-                    null
-            )
+        new KinesisDataSourceMetadata(

Review comment:
       Nit: If you are reformatting this, maybe just put the whole constructor 
on a single line.

##########
File path: 
extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java
##########
@@ -1294,7 +1297,9 @@ public void testRequeueAdoptedTaskWhenFailed() throws 
Exception
             .times(1);
 
 
-    
EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of(captured.getValue())).anyTimes();
+    EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE))

Review comment:
       Nit: Please try to avoid formatting changes unless they are relevant to 
the core set of changes.

##########
File path: 
extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java
##########
@@ -4885,6 +4907,135 @@ private void testShardMergePhaseThree(List<Task> 
phaseTwoTasks) throws Exception
     Assert.assertEquals(expectedPartitionOffsets, 
supervisor.getPartitionOffsets());
   }
 
+  @Test
+  public void testUpdateClosedShardCache()
+  {
+    supervisor = getTestableSupervisor(1, 2, true, "PT1H", null, null);
+    supervisor.setupRecordSupplier();
+    supervisor.tryInit();
+    String stream = supervisor.getKinesisSupervisorSpec().getSource();
+    Shard openShard = EasyMock.mock(Shard.class);
+    Shard emptyClosedShard = EasyMock.mock(Shard.class);
+    Shard nonEmptyClosedShard = EasyMock.mock(Shard.class);
+    Set<Shard> activeShards;
+    Set<String> emptyClosedShardIds;
+    Set<String> nonEmptyClosedShardIds;
+
+    // ITERATION 0:
+    // active shards: an open shard, closed-empty shard and closed-nonEmpty 
shard
+    activeShards = getActiveShards(openShard, true,
+                                   emptyClosedShard, true,
+                                   nonEmptyClosedShard, true);
+
+    EasyMock.reset(supervisorRecordSupplier);

Review comment:
       Rather than resetting the mock objects every time, you could do the 
following:
   1. have some open shards, closed empty and closed non-empty shards (and 
maybe open empty too, just to be sure)
   2. setup the record supplier to return all of these when calling `getShards`
   3. setup the record supplier to return the correct values for 
`isClosedShardEmpty` for each shard
   4. call `supervisor.getIgnorablePartitionIds()` and verify the returned 
values
   5. in step 4, verify the shard ids on which the mock record supplier calls 
`isClosedShardEmpty`
   (optional step 5a: move some shards from open to closed, empty to non-empty, 
add new shards, etc.)
   6. call `supervisor.getIgnorablePartitionIds()` again and verify the 
returned values
   7. In step 6, verify the shard ids on which the mock record supplier calls 
`isClosedShardEmpty`
   
   I feel this would be easier to follow and more representative of what would 
happen in a real scenario. It would also help you avoid resetting the mock 
objects and avoid exposing the internal caches which are currently only visible 
for testing.

##########
File path: 
extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplierTest.java
##########
@@ -1023,4 +1024,61 @@ public void getPartitionTimeLag() throws 
InterruptedException
     }
     verifyAll();
   }
+
+  @Test
+  public void testIsClosedShardEmpty()
+  {
+    AmazonKinesis mockKinesis = EasyMock.mock(AmazonKinesis.class);
+    KinesisRecordSupplier target = new KinesisRecordSupplier(mockKinesis,
+                                                             recordsPerFetch,
+                                                             0,
+                                                             2,
+                                                             false,
+                                                             100,
+                                                             5000,
+                                                             5000,
+                                                             60000,
+                                                             5,
+                                                             true
+    );
+    Record record = new Record();
+    String shardId;
+
+    // No records and null iterator -> empty
+    shardId = "0";
+    isClosedShardEmptyHelper(mockKinesis, shardId, new ArrayList<>(), null);
+    Assert.assertTrue(target.isClosedShardEmpty(STREAM, shardId));
+
+    // no records and non-null iterator -> non-empty
+    shardId = "1";
+    isClosedShardEmptyHelper(mockKinesis, shardId, new ArrayList<>(), 
"nextIterator");
+    Assert.assertFalse(target.isClosedShardEmpty(STREAM, shardId));
+
+    // non-empty records and null iterator -> non-empty
+    shardId = "2";
+    isClosedShardEmptyHelper(mockKinesis, shardId, 
Collections.singletonList(record), null);
+    Assert.assertFalse(target.isClosedShardEmpty(STREAM, shardId));
+
+    // non-empty records and non-null iterator -> non-empty
+    shardId = "3";
+    isClosedShardEmptyHelper(mockKinesis, shardId, 
Collections.singletonList(record), "nextIterator");
+    Assert.assertFalse(target.isClosedShardEmpty(STREAM, shardId));
+  }
+
+  private void isClosedShardEmptyHelper(AmazonKinesis kinesis, String shardId,

Review comment:
       Rename this to something more descriptive of the actual contents of this 
method.
   e.g. `setupMockKinesisForShardId` or something. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to