abhishekagarwal87 commented on a change in pull request #12235:
URL: https://github.com/apache/druid/pull/12235#discussion_r801700846



##########
File path: 
extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplier.java
##########
@@ -750,6 +752,25 @@ public boolean isAnyFetchActive()
                              .anyMatch(fetch -> (fetch != null && 
!fetch.isDone()));
   }
 
+  /**
+   * Is costly and requires polling the shard to determine if it's empty
+   * @param stream to which shard belongs
+   * @param shardId of the shard
+   * @return if the shard is empty
+   */
+  public boolean isShardEmpty(String stream, String shardId)

Review comment:
       To be clear, this method will not return true for all the empty shards. 
it will return true for all the empty shards that are also closed. is that 
correct? 

##########
File path: 
extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java
##########
@@ -416,6 +424,49 @@ protected boolean supportsPartitionExpiration()
     return true;
   }
 
+  @Override
+  protected boolean shouldSkipIgnorablePartitions()
+  {
+    return spec.getSpec().getTuningConfig().shouldSkipIgnorableShards();
+  }
+
+  /**
+   * Closed and empty shards can be ignored for ingestion,
+   * Use this method if skipIgnorablePartitions is true in the spec
+   *
+   * These partitions can be safely ignored for both ingesetion task 
assignment and autoscaler limits
+   *
+   * @return the set of ignorable shards' ids
+   */
+  @Override
+  protected Set<String> getIgnorablePartitionIds()
+  {
+    updateClosedShardCache();
+    return ImmutableSet.copyOf(emptyClosedShardIds);
+  }
+
+  private void updateClosedShardCache()
+  {
+    String stream = spec.getSource();
+    Set<Shard> allActiveShards = ((KinesisRecordSupplier) 
recordSupplier).getShards(stream);
+    Set<String> activeClosedShards = allActiveShards.stream()
+                                                    .filter(shard -> 
isShardOpen(shard))

Review comment:
       ```suggestion
                                                       .filter(shard -> 
!isShardOpen(shard))
   ```
   is that how it should be? 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to